Оператором в Kubernetes принято называть развертывание, которое самостоятельно управляет ресурсами кластера, регистрирует новые Custom Resource Definition (CRD) и, в некоторых случаях, добавляется для наблюдения за существующими ресурсами (через механизмы Dynamic Admission Control). В этой статье на примере создания оператора для развертывания и управления кластером Aerospike мы попробуем разобраться с этапами создания оператора, способами взаимодействия с кластером и проблемами, с которыми можно встретиться в реальной практике. Всех практикующих DevOps и желающих поднять автоматизацию развертывания своих сервисов на новый уровень приглашаю под кат.

Прежде всего нужно определить, какую функциональность будет поддерживать будущий оператор. Поскольку база данных Aerospike допускает работу в режиме кластера, будет разумно создать CRD для описания характеристик кластера (количество подов, ограничения по памяти и процессору и др), конфигурации пространства имен, а также настройки резервного копирования. Также оператор должен отслеживать изменения в существующих ресурсах, связанных с ним, и динамически изменять топологию системы и/или настройки подов.

Для разработки оператора мы будем использовать язык программирования Go и набор инструментов Operator SDK, который нужно будет предварительно установить:

git clone https://github.com/operator-framework/operator-sdk
cd operator-sdk
make install

После установки SDK можно выполнить инициализацию заготовки проекта оператора:

operator-sdk init --domain dzolotov.tech --owner "Dmitrii Zolotov"

Поскольку мы предполагаем создание дополнительных типов ресурсов, необходимо определить идентификатор версии API, который представляет собой URI на произвольном домене. Для удобства использования operator-sdk можно сразу установить поддержку автодополнения в shell Debian/Ubuntu:

apt install bash-completion
echo 'source <(operator-sdk completion bash)' >>~/.bashrc

Operator SDK поддерживает создание операторов для разных DevOps-решений, включая Ansible и Helm, для этого при инициализации можно указывать необходимые плагины. Также инструмент командной строки позволяет выполнять развертывание оператора (operator-sdk olm), запуск оператора в выбранном контексте (operator-sdk run) и выполнять тесты (operator-sdk scorecard). Также можно создавать фрагменты кода для реализации разных типов Kubernetes-ресурсов. Начнем с создания версии apiVersion:

operator-sdk create api --group aerospike --version v1alpha1 --kind AerospikeCluster --resource --controller

Генератор создает несколько файлов для взаимодействия с Kubernetes API:

  • api/v1alpha1/aerospikecluster_types.go - описание типов (CRD);

  • controllers/aerospikecluster_controller.go - контроллер для создания и отслеживания типов CRD;

  • /go.mod - содержит зависимости для подключения к API (k8s.io/apimachinery - доступ к API для управления ресурсами, k8s.io/client-go - Go-клиент для взаимодействия с API, sigs.k8s.io/controller-runtime - базовые классы для реализации контроллера, github.com/onsi/ginkgo - тестовый фреймворк для проверки оператора).

Попробуем сгенерировать манифесты для Kubernetes, которые будут определять Custom Resource Definition и развертывания оператора: make manifests. Результатом выполнения будет генерация файлов в crd/bases - регистрация CRD-ресурсов (apiextensions.k8s.io/v1/CustomResourceDefinition) с названием aerospikeclusters.aerospike.dzolotov.tech для создаваемого API (группа aerospike.my.domain, тип ресурса AerospikeCluster). По умолчанию схема ресурса содержит строковое поле foo, далее мы переопределим схему в коде описания типов на Go:

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.8.0
  name: aerospikeclusters.aerospike.my.domain
spec:
  group: aerospike.dzolotov.tech
  names:
    kind: AerospikeCluster
    listKind: AerospikeClusterList
    plural: aerospikeclusters
    singular: aerospikecluster
  # будет локальным для namespace (может быть Cluster для создания ресурса на уровне кластера, без привязки к namespace)
  scope: Namespaced
  versions:
  # создаваемая версия CRD
  - name: v1alpha1
    schema:
      # описание схемы yaml-документа
      openAPIV3Schema:
        description: AerospikeCluster is the Schema for the aerospikeclusters API
        properties:
          # версия api (позволит делать разные версии для API)
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          # тип ресурса (в нашем случае будет AerospikeCluster)
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          # описание метаданных (.meta) - определяется по схеме metav1.ObjectMeta
          metadata:
            type: object
          # описание спецификации кластера (.spec) - в соответствии со схемой
          spec:
            description: AerospikeClusterSpec defines the desired state of AerospikeCluster
            properties:
              # свойства спецификации кластера (.spec)
              foo:
                description: Foo is an example field of AerospikeCluster. Edit aerospikecluster_types.go
                  to remove/update
                type: string
            type: object
          # назначение ключа объекта yaml .status (пустой объект)
          status:
            description: AerospikeClusterStatus defines the observed state of AerospikeCluster
            type: object
        type: object
    # флаг актуальности версии CRD
    served: true
    # версия по умолчанию
    storage: true

Примером корректного ресурса с использованием указанного CRD:

apiVersion: aerospike.my.domain/alpha1
kind: AerospikeCluster
metadata:
  name: demo
spec:
  foo: bar
status: {}

Прежде чем мы пойдем рассматривать код контроллера дополним схему данных и добавим следующие поля:

  • deployments (int) - количество серверов в кластере;

  • maxMemory (int) - ограничение по размеру памяти (в Гб);

  • maxCPU (string) - ограничение по использованию процессора;

  • logging (string) - уровень протоколирования;

  • compressionLevel (int) - уровень сжатия;

  • replicationFactor (int) - фактор репликации для кластера;

  • strongConsistency (bool) - включение режима строгой согласованности (в ущерб производительности);

  • heartbeat:

    • interval (int) - интервал для проверки доступности соседних узлов (в миллисекундах).

  • keepalive:

    • enabled (bool) - разрешено использование keepalive-сообщений;

    • interval (int) - интервал в секундах между сообщениями.

Для демонстрации такого набора будет достаточно, в реальном кластере разумеется нужно предусмотреть наличие всех необходимых опций для конфигурации узлов. Структура данных для спецификации кластера определяется в файле api/alphav1/aerospikecluster_types.go, сейчас там даны определения для всех создаваемых типов данных (кластер, список кластеров и связанные с ними типы):

package v1alpha1

import (
        metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

//ожидаемое состояние кластера (спецификация)
type AerospikeClusterSpec struct {
        Foo string `json:"foo,omitempty"`
}

//текущее состояние кластера
type AerospikeClusterStatus struct {
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

type AerospikeCluster struct {
        metav1.TypeMeta   `json:",inline"`
  //стандартное определение метаданных (namespace, name, annotations, ...)
        metav1.ObjectMeta `json:"metadata,omitempty"`

  //кластер содержит спецификацию и текущий статус
        Spec   AerospikeClusterSpec   `json:"spec,omitempty"`
        Status AerospikeClusterStatus `json:"status,omitempty"`
}

//+kubebuilder:object:root=true

type AerospikeClusterList struct {
        metav1.TypeMeta `json:",inline"`
        metav1.ListMeta `json:"metadata,omitempty"`
        Items           []AerospikeCluster `json:"items"`
}

//регистрация типов объектов и их схем
func init() {
        SchemeBuilder.Register(&AerospikeCluster{}, &AerospikeClusterList{})
}

Добавим необходимые поля (и структуры) в структуру AerospikeClusterSpec и обозначим в статусе текущее состояние кластера: deployed (bool) - устанавливается в true когда развертывание завершено, brokenNodes (int) - количество недоступных узлов. Обновим определение типов и выполним повторно команду make в корневом каталоге проекта, это приведет к выполнению форматирования и проверки синтаксиса файлов.

type AerospikeHeartBeat struct {
        // Heartbeat interval in milliseconds
        Interval int `json:"interval,omitempty"`
}

type AerospikeKeepAlive struct {
        // Keep alive is enabled
        Enabled bool `json:"enabled,omitempty"`
        // Keep alive messages interval in seconds
        Interval int `json:"interval,omitempty"`
}

// ожидаемое состояние кластера (спецификация)
type AerospikeClusterSpec struct {
        // Deployments count
        Deployments int `json:"deployments,omitempty"`
        // Maximum memory limit in GB (for example, 4)
        MaxMemory *int `json:"maxMemory,omitempty"`
        // Maximum cpu limit (100 = full power)
        MaxCPU *int `json:"maxCPU,omitempty"`
        // Logging level (https://docs.aerospike.com/reference/configuration)
        Logging *string `json:"logging,omitempty"`
        // Compression level (1 - lower compression, 9 - higher, but slower compression)
        CompressionLevel *int `json:"compressionLevel,omitempty"`
        // Min replicas count
        ReplicationFactor int `json:"replicationFactor,omitempty"`
        // Enable strong consistency mode
        StrongConsistency bool `json:"strongConsistency,omitempty"`
        // Heartbeat configuration
        HeartBeat *AerospikeHeartBeat `json:"heartbeat,omitempty"`
        // Keepalive configuration
        Keepalive *AerospikeKeepAlive `json:"keepalive,omitempty"`
}

// текущее состояние кластера
type AerospikeClusterStatus struct {
        // All nodes are prepared and ready
        Deployed bool `json:"deployed"`
        // How many nodes isn't available
        BrokenNodes int `json:"brokenNodes"`
}

После применения make manifests можем убедиться, что описание создания CRD-ресурса соответствует измененной схеме (фрагмент приведен ниже):

compressionLevel:
	description: Compression level (1 - lower compression, 9 - higher,
		but slower compression)
	type: integer
deployments:
	description: Deployments count
	type: integer
heartbeat:
	description: Heartbeat configuration
	properties:
		interval:
			description: Heartbeat interval in milliseconds
			type: integer
		type: object

При необходимости создать дополнительную версию существующего ресурса или создать новый можно использовать как инструмент operator-sdk create api, так и создавать дополнительные каталоги и файлы на основе существующего шаблона.

Дальше посмотрим на контроллер и тут ключевую роль играет Reconciler, который выполняет изменение конфигурации в соответствии с разностью между текущим и ожидаемым состоянием. Кроме это для проверки корректности согласованности спецификации может быть добавлено использование webhook и здесь нужно сделать небольшое введение в Dynamic Admission Control. Динамическое управление позволяет зарегистрировать два вида дополнительных обработчиков во время операций над ресурсами - ValidatingWebhook (проверяет корректность схемы создаваемого ресурса, позволяет отменить операцию при нарушении схемы) и MutatingWebhook (допускает возможность внесения изменений в ресурс перед развертыванием, например используется для встраивания sidecar-контейнеров в операторе управления резервным копированием Stash, который мы рассматривали в этой статье). Operator SDK позволяет также создавать и регистрировать webhook и даже генерировать заготовку ValidatingWebhook для проверки согласованности данных .

operator-sdk create webhook --group aerospike --version v1alpha1 --kind AerospikeCluster --defaulting --programmatic-validation

После вызова генератора создается файл aerospikecluster_webhook.go, в котором нужно будет реализовать функции проверки конфигурации при создании ValidateCreate(), при обновлении ValidateUpdate() и при удалении ValidateDelete(). Добавим дополнительную проверку, что количество реплик не превышает количество узлов и количество узлов является натуральным числом. В результате получим следующий код:

package v1alpha1

import (
        apierrors "k8s.io/apimachinery/pkg/api/errors"
        "k8s.io/apimachinery/pkg/runtime"
        "k8s.io/apimachinery/pkg/runtime/schema"
        "k8s.io/apimachinery/pkg/util/validation/field"
        ctrl "sigs.k8s.io/controller-runtime"
        logf "sigs.k8s.io/controller-runtime/pkg/log"
        "sigs.k8s.io/controller-runtime/pkg/webhook"
        "strconv"
)

// log is for logging in this package.
var aerospikeclusterlog = logf.Log.WithName("aerospikecluster-resource")

func (r *AerospikeCluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
        return ctrl.NewWebhookManagedBy(mgr).
                For(r).
                Complete()
}

//+kubebuilder:webhook:path=/mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=maerospikecluster.kb.io,admissionReviewVersions=v1

var _ webhook.Defaulter = &AerospikeCluster{}

//здесь можно применить значения по умолчанию (выполняется до валидации)
func (r *AerospikeCluster) Default() {
        aerospikeclusterlog.Info("default", "name", r.Name)
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=vaerospikecluster.kb.io,admissionReviewVersions=v1

var _ webhook.Validator = &AerospikeCluster{}

func (r *AerospikeCluster) validateReplicas() error {
        var allErrs field.ErrorList
        if r.Spec.ReplicationFactor > r.Spec.Deployments {
                fldPath := field.NewPath("spec").Child("replicationFactor")
                allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Replication factor must be lower than deployments count"))
        }
        if r.Spec.Deployments < 1 {
                fldPath := field.NewPath("spec").Child("deployments")
                allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Deployments counter must be greater than zero"))
        }
        if len(allErrs) == 0 {
                return nil
        }
        return apierrors.NewInvalid(schema.GroupKind{Group: "aerospike.dzolotov.tech", Kind: "AerospikeCluster"}, r.Name, allErrs)
}

//проверка при создании нового ресурса
func (r *AerospikeCluster) ValidateCreate() error {
        aerospikeclusterlog.Info("validate create", "name", r.Name)
        return r.validateReplicas()
}

//проверка при обновлении ресурса
func (r *AerospikeCluster) ValidateUpdate(old runtime.Object) error {
        aerospikeclusterlog.Info("validate update", "name", r.Name)
        return r.validateReplicas()
}

//проверка при удалении ресурса (сейчас отключена в метаданных kubebuilder)
func (r *AerospikeCluster) ValidateDelete() error {
        aerospikeclusterlog.Info("validate delete", "name", r.Name)
        return nil
}

Здесь мы используем механизм apierrors, рекомендуемый Kubernetes для подробного описания ошибки и места ее возникновения, который позволяет объединить все ошибки в один объект, но можно было использовать и обычный объект ошибки. Обратите внимание на комментарии //+kubebuilder:webhook, они содержат метаданные, которые используются для генерации манифеста и здесь, например, можно изменить список валидаций (сейчас используются только create,update, но при необходимости можно сделать валидацию на удаление (например, так можно запретить удаление ресурса, но лучше такого не делать). После вызова make / make manifests посмотрим на сгенерированный файл манифеста для установки AdmissionWebhooks:

apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
  creationTimestamp: null
  name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
  - v1
  clientConfig:
    service:
      name: webhook-service
      namespace: system
      path: /mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster
  failurePolicy: Fail
  name: maerospikecluster.kb.io
  rules:
  - apiGroups:
    - aerospike.dzolotov.tech
    apiVersions:
    - v1alpha1
    operations:
    - CREATE
    - UPDATE
    resources:
    - aerospikeclusters
  sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
  creationTimestamp: null
  name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
  - v1
  clientConfig:
    service:
      name: webhook-service
      namespace: system
      path: /validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster
  failurePolicy: Fail
  name: vaerospikecluster.kb.io
  rules:
  - apiGroups:
    - aerospike.dzolotov.tech
    apiVersions:
    - v1alpha1
    operations:
    - CREATE
    - UPDATE
    resources:
    - aerospikeclusters
  sideEffects: None

Здесь можно увидеть, что и MutatingWebhook и ValidatingWebhook применяются на ресурсы в apiVersion: aerospike.dzolotov.tech/v1alpha1 при операциях создания и обновления на ресурсах aerospikecluster. Теперь посмотрим на другие подкаталоги в config и поговорим о правах доступа (RBAC) и развертывании контроллера.

  • certmanager - манифест для издания сертификата для валидации webhook (создается автоматически), привязывается к сервису контроллера;

  • crd - описание типов ресурсов (CRD), мы рассмотрели его выше, но добавлю что кроме наших определений есть еще патчи, которые позволяют пробросить созданный через certmanager сертификат в аннотации ресурса для использования в коде;

  • default - содержит патчи для контроллера, которые публикуют webhook-сервер и разворачивают system/controller-manager из gcr.io/kubebuilder/kube-rbac-proxy:v0.11.0;

  • manifests - конфигурация для kustomization, который создает манифесты при сборке финального артефакта для установки оператора;

  • prometheus - регистрация ServiceMonitor для экспорта метрик из controller-manager в prometheus;

  • rbac - набор прав для обеспечения доступа контроллера к управлению ресурсами aerospikeclusters, а также для публикации метрик, сервисной записи для выполнения controller-manager и др.;

  • scorecard - конфигурация для запуска тестов через OLM (Operator Lifecycle Manager, позволяет управлять запуском и тестирование операторов внутри существующего кластера kubernetes);

  • webhook - конфигурация для регистрации AdminissionWebhooks (рассмотрели ранее).

Коротко об RBAC (Role-Based Access Control) - это внутренний механизм контроля доступа Kubernetes, который определяет понятие Role (или ClusterRole, если без привязки к пространству имен), перечисляющее какие действия и над какими ресурсами могут быть выполнены и RoleBinding (ClusterRoleBinding) для связывания учетной записи пользователи или сервисной записи (ServiceAccount) с соответствующей ролью. Таким образом, чтобы оператор мог создавать новые ресурсы Deployment, требуется определить роль с разрешениями на create/update/patch/delete для ресурсов kind: Deployment в apiVersion: apps/v1 и связать ее с сервисной записью, под которой происходит развертывание оператора. Для управление нашим контроллером будет запущен процесс controller-manager, он также будет отслеживать доступность контроллера и перезапускать его при необходимости.

Вернемся теперь к контроллеру (controllers/aerospikecluster_controller.go) и посмотрим внимательно на его структуру:

package controllers

import (
        "context"

        "k8s.io/apimachinery/pkg/runtime"
        ctrl "sigs.k8s.io/controller-runtime"
        "sigs.k8s.io/controller-runtime/pkg/client"
        "sigs.k8s.io/controller-runtime/pkg/log"

        aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1"
)

// Объект реконсилера для приведения кластера к ожидаемому состоянию
type AerospikeClusterReconciler struct {
        client.Client
        Scheme *runtime.Scheme
}

// Описание разрешений (будут транслированы в роли)
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/finalizers,verbs=update

func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
        _ = log.FromContext(ctx)

			  //здесь будет наш код управления кластером
        return ctrl.Result{}, nil
}

// Регистрация с использованием ControllerManager
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
        return ctrl.NewControllerManagedBy(mgr).
                For(&aerospikev1alpha1.AerospikeCluster{}).
                Complete(r)
}

Поскольку мы будем управлять развертыванием кластера серверов (будем использовать StatefulSet, поскольку нам важно сохранить связь экземпляра пода и его местоположения, также нам будет нужно иметь возможность создания и изменения ConfigMap для определения конфигурации сервера по параметрам из манифеста), то нам нужно будет сообщить контроллеру о том, что мы будем наблюдать за ресурсами StatefulSet и Pod и добавить соответствующие правила в комментариях +kubebuilder. Прежде всего добавим импорт структур, описывающих пространство имен apps/v1 и core/v1 (для ConfigMap):

import (
	"context"

	appsv1 "k8s.io/api/apps/v1"
  corev1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1"
)

И сообщим менеджеру о том, что мы будем управлять объектами StatefulSet и ConfigMap:

func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&aerospikev1alpha1.AerospikeCluster{}).
		Owns(&appsv1.StatefulSet{}).
		Owns(&corev1.ConfigMap{}).
		Complete(r)
}

И добавим метаданные перед методом Reconcile для описания требуемых прав доступа:

//+kubebuilder:rbac:groups=apps/v1,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps/v1,resources=pods,verbs=get;list
//+kubebuilder:rbac:groups=v1,resources=configmaps,verbs=get;list;create;update;patch;delete

Сама реализация будет использовать публичные методы API Kubernetes, прежде всего для нас интересны методы:

  • r.List(ctx, objectList, opts) - возвращает список объектов в objectList с отбором по opts (требует разрешения list на этот тип объектов в RBAC-роли), например:

podList := &v1.PodList{}
opts := []client.ListOption{
		client.InNamespace(request.NamespacedName.Namespace),
		client.MatchingLabels{"instance": request.NamespacedName.Name},
		client.MatchingFields{"status.phase": "Running"},
}
err := r.List(ctx, podList, opts...)

Выбирает все запущенные поды с меткой instance и значением, совпадающих с именем кластера (поиск осуществляется в том же пространстве имен, где развернут ресурс AerospikeCluster).

  • r.Get(ctx, namespacedName, object) - сохраняет объект в object, идентификацию объекта выполняет NamespacedName, включающий пространство имен Namespace и имя внутри него Name (требует разрешение get на этот тип объектов в RBAC-роли);

  • r.Create(ctx, object, opts) - создает новый объект в кластере (требуется наличие разрешений create на этот тип объекта в RBAC-роли), например:

func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

  //прочитаем ресурс описания кластера
	cluster := &aerospikev1alpha1.AerospikeCluster{}
	err := r.Get(ctx, req.NamespacedName, cluster)
	if err != nil {
		return ctrl.Result{}, err
	}

  //преобразование числовых полей в строки и в тип int32 для передачи в структуру
  //описания StatefulSet
	var deployments int32
	deployments = int32(cluster.Spec.Deployments)
	var replicationFactor string
	replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
	var maxMemory string
	maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)
	var maxCpu string
	maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)

  //описание нового объекта StatefulSet
	sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: req.NamespacedName.Namespace, Name: req.NamespacedName.Name}, Spec: appsv1.StatefulSetSpec{
		Replicas: &deployments,
		Selector: &v1.LabelSelector{
			MatchLabels: map[string]string{"label": req.Name},
		},
		Template: corev1.PodTemplateSpec{
			Spec: corev1.PodSpec{
				Containers: []corev1.Container{
					{
						Name:    "aerospike",
						Image:   "aerospike",
						Args:    nil,
						EnvFrom: nil,
						Env: []corev1.EnvVar{
							{
								Name:  "REPL_FACTOR",
								Value replicationFactor,
							},
							{
								Name:  "MEM_GB",
								Value: maxMemory,
							},
						},
						Resources: corev1.ResourceRequirements{
							Limits: corev1.ResourceList{
								corev1.ResourceCPU: resource.MustParse(maxCpu),
							},
						},
					},
				},
			},
		},
		UpdateStrategy: appsv1.StatefulSetUpdateStrategy{},
	}}
	r.Create(ctx, &sts)

	return ctrl.Result{Requeue: true}, nil
}
  • r.Update(ctx, object, opts) - обновление конфигурации объекта (для StatefulSet например можно изменить переменные окружения и количество реплик), требуется разрешение update на этот тип ресурсов в RBAC-роли;

  • r.Patch(ctx, object, patch) - внесение исправлений в структуру, описывающую запущенный объект;

  • r.Delete(ctx, objects, opts) - удаление объекта из кластера (требуется разрешение delete на этот тип ресурсов в RBAC-роли);

  • r.Status().Update(ctx, cluster) - дает доступ к модификации объекта состояния (виден в манифесте и в объекте cluster как .status).

Поведение reconciler в нашем случае может быть описано несколькими возможными сценариями:

  • объект кластера существует, но нет statefulset - создаем новый statefulset и configmap;

  • объект кластера существует и есть statefulset - обновляем параметры конфигурации (лимиты, переменные окружения, количество реплик) и пересоздаем configmap;

  • объект кластера не существует - удаляем statefulset и configmap (игнорируем ошибки).

В целом код можно разделить на несколько частей: генерация и обновление configmap, обработка удаления кластера, обновление конфигурации существующего кластера. Код может выглядеть таким образом. Обновление или создание configmap:

func (r *AerospikeClusterReconciler) generateConfigMap(ctx context.Context, cluster *aerospikev1alpha1.AerospikeCluster) error {
	//проверим на наличие существующего configmap
	var cm corev1.ConfigMap
	err := r.Get(ctx, types.NamespacedName{
		Namespace: cluster.Namespace,
		Name:      cluster.Name,
	}, &cm)
	if err != nil {
		if errors.IsNotFound(err) {
			//создаем новый configmap
			newConfigMap := corev1.ConfigMap{
				ObjectMeta: v1.ObjectMeta{
					Namespace: cluster.Namespace,
					Name:      cluster.Name,
				},
				Data: map[string]string{
					"aerospike": createConfigMap(cluster),
				},
			}
			err := r.Create(ctx, &newConfigMap)
			return err
		} else {
			return err
		}
	}
	//если объект уже существует, то изменяем данные
	cm.Data = map[string]string{
		"aerospike": createConfigMap(cluster),
	}
	err = r.Update(ctx, &cm)
	return err
}

func createConfigMap(cluster *aerospikev1alpha1.AerospikeCluster) string {
	result := make([]string, 0, 0)
	if cluster.Spec.Logging != nil {
		result = append(result, "logging "+*cluster.Spec.Logging)
	}
	result = append(result, "strong-consistency "+strconv.FormatBool(cluster.Spec.StrongConsistency))
	if cluster.Spec.CompressionLevel != nil {
		result = append(result, "compression-level "+strconv.Itoa(*cluster.Spec.CompressionLevel))
	}
	result = append(result, "interval "+strconv.Itoa(cluster.Spec.HeartBeat.Interval))
	if cluster.Spec.Keepalive != nil {
		result = append(result, "keepalive-enabled "+strconv.FormatBool(cluster.Spec.Keepalive.Enabled))
		result = append(result, "keepalive-intvl "+strconv.Itoa(cluster.Spec.Keepalive.Interval))
	}
	return strings.Join(result, "\n")
}

Подготовка объекта StatefulSet по описанию кластера:

func (r *AerospikeClusterReconciler) getConfiguration(cluster *aerospikev1alpha1.AerospikeCluster) *appsv1.StatefulSet {
	var deployments int32
	deployments = int32(cluster.Spec.Deployments)
	var replicationFactor string
	replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
	var maxMemory string
	maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)
	var maxCpu string
	maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)

	sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, Spec: appsv1.StatefulSetSpec{
		Replicas: &deployments,
		Selector: &v1.LabelSelector{
			MatchLabels: map[string]string{"instance": cluster.Name},
		},
		Template: corev1.PodTemplateSpec{
			ObjectMeta: ctrl.ObjectMeta{
				Labels: map[string]string{"instance": cluster.Name},
			},
			Spec: corev1.PodSpec{
				Containers: []corev1.Container{
					{
						Name:    "aerospike",
						Image:   "aerospike",
						Args:    nil,
						EnvFrom: nil,
						VolumeMounts: []corev1.VolumeMount{
							{
								Name:      "config",
								MountPath: "/etc/aerospike/",
							},
						},
						Env: []corev1.EnvVar{
							{
								Name:  "REPL_FACTOR",
								Value: replicationFactor,
							},
							{
								Name:  "MEM_GB",
								Value: maxMemory,
							},
						},
						Resources: corev1.ResourceRequirements{
							Limits: corev1.ResourceList{
								corev1.ResourceCPU: resource.MustParse(maxCpu),
							},
						},
					},
				},
				Volumes: []corev1.Volume{
					{
						Name: "config",
						VolumeSource: corev1.VolumeSource{
							ConfigMap: &corev1.ConfigMapVolumeSource{
								LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Name},
								Items: []corev1.KeyToPath{
									{
										Key:  "aerospike",
										Path: "aerospike.conf",
									},
								},
							},
						},
					},
				},
			},
		},
		UpdateStrategy: appsv1.StatefulSetUpdateStrategy{},
	}}
	return &sts
}

Логика удаления, создания и обновления ресурсов кластера и его состояния:

func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx) //можно сохранить в logger и использовать для сообщений об ошибках или диагностики

	cluster := &aerospikev1alpha1.AerospikeCluster{}
	err := r.Get(ctx, req.NamespacedName, cluster)
	if err != nil {
		if errors.IsNotFound(err) {
			//кластер был удален, удалим развертывание
			sts := &appsv1.StatefulSet{}
			err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, sts)
			if err == nil {
				err = r.Delete(ctx, sts)
				if err == nil {
					cm := &corev1.ConfigMap{}
					err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, cm)
				}
			}
		}
		//и завершаем reconfile успешно (или с ошибкой)
		return ctrl.Result{}, err
	}

	// Check if the deployment already exists, if not create a new deployment.
	found := &appsv1.StatefulSet{}
	err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, found)
	if err != nil {
		if errors.IsNotFound(err) {
			// существующего развертывания нет - создаем новое
			err2 := r.generateConfigMap(ctx, cluster) //создаем configmap
			if err2 != nil {
				return ctrl.Result{}, err
			}
			dep := r.getConfiguration(cluster)
			if err = r.Create(ctx, dep); err != nil {
				return ctrl.Result{}, err
			}
			cluster.Status.BrokenNodes = 0
			cluster.Status.Deployed = true
			r.Status().Update(ctx, cluster)
			return ctrl.Result{Requeue: true}, nil
		} else {
			return ctrl.Result{}, err
		}
	}

	deployments := int32(cluster.Spec.Deployments)
	//здесь можно сравнить текущий и предыдущий размер кластера и изменить конфигурацию, если требуется
	//но у нас могут поменяться обновременно и переменные окружения и configmap
	found.Spec.Replicas = &deployments
	var replicationFactor string
	replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
	var maxMemory string
	maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)

	found.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
		{
			Name:  "REPL_FACTOR",
			Value: replicationFactor,
		},
		{
			Name:  "MEM_GB",
			Value: maxMemory,
		},
	}

	found.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().Set(int64(*cluster.Spec.MaxCPU))
	err = r.generateConfigMap(ctx, cluster)
	if err != nil {
		return ctrl.Result{}, err
	}

	// получим активное количество подов и обновим BrokenNodes в состоянии
	podList := &corev1.PodList{}
	listOpts := []client.ListOption{
		client.InNamespace(cluster.Namespace),
		client.MatchingLabels{"instance": cluster.Name},
		client.MatchingFields{"status.phase": "Running"},
	}
	if err = r.List(ctx, podList, listOpts...); err != nil {
		return ctrl.Result{}, err
	}

	// обновим количество неактивных узлов
	cluster.Status.BrokenNodes = cluster.Spec.Deployments - len(podList.Items)
	err2 := r.Status().Update(ctx, cluster)

	return ctrl.Result{}, err2
}

Для применения созданного оператора можно использовать make deploy в корневом каталоге проекта или через OLM:

operator-sdk olm install
make bundle IMG="dzolotov.tech/aerospike-operator:v0.0.1"
make bundle-build bundle-push BUNDLE_IMG="dzolotov.tech/aerospike-operator-bundle:v0.0.1"
operator-sdk run bundle dzolotov.tech/aerospike-operator-bundle:v0.0.1

Здесь будет собран образ контейнера (его можно отправить, например, в Docker hub) и запущен оператор на текущем кластере Kubernetes. Для остановки оператора применяются действия make undeployили operator-sdk cleanup aerospike-operator (для OLM).

Проверить работу оператора можно через создание ресурса, который был определен ранее:

cat <<< EOF | kubectl apply -f -
apiVersion: dzolotov.tech/v1alpha1
kind: AerospikeCluster
metadata:
  namespace: cluster
  name: demo
spec:
  deployments: 2
  maxMemory: 2
  compressionLevel: 4
  replicationFactor: 1
  strongConsistency: false
  heartbeat:
    interval: 10000
EOF

С использованием операторов возможно реализовать сложную логику по управлению распределенными системами или обеспечить возможность конфигурирования разворачиваемых приложений в терминах предметной области. Большое количество операторов можно найти на https://operatorhub.io/, а документацию и статьи по разработке в этой коллекции ссылок.

Также хочу пригласить всех заинтересованных на бесплатный вебинар, в рамках которого мы комплексно рассмотрим основные векторы по обеспечению безопасности kubernetes кластера и подробно остановимся на каждом из них. Затронем тему безопасности docker-образов, безопасность в рантайме, network и application security.

Комментарии (0)