Оператором в Kubernetes принято называть развертывание, которое самостоятельно управляет ресурсами кластера, регистрирует новые Custom Resource Definition (CRD) и, в некоторых случаях, добавляется для наблюдения за существующими ресурсами (через механизмы Dynamic Admission Control). В этой статье на примере создания оператора для развертывания и управления кластером Aerospike мы попробуем разобраться с этапами создания оператора, способами взаимодействия с кластером и проблемами, с которыми можно встретиться в реальной практике. Всех практикующих DevOps и желающих поднять автоматизацию развертывания своих сервисов на новый уровень приглашаю под кат.
Прежде всего нужно определить, какую функциональность будет поддерживать будущий оператор. Поскольку база данных Aerospike допускает работу в режиме кластера, будет разумно создать CRD для описания характеристик кластера (количество подов, ограничения по памяти и процессору и др), конфигурации пространства имен, а также настройки резервного копирования. Также оператор должен отслеживать изменения в существующих ресурсах, связанных с ним, и динамически изменять топологию системы и/или настройки подов.
Для разработки оператора мы будем использовать язык программирования Go и набор инструментов Operator SDK, который нужно будет предварительно установить:
git clone https://github.com/operator-framework/operator-sdk
cd operator-sdk
make install
После установки SDK можно выполнить инициализацию заготовки проекта оператора:
operator-sdk init --domain dzolotov.tech --owner "Dmitrii Zolotov"
Поскольку мы предполагаем создание дополнительных типов ресурсов, необходимо определить идентификатор версии API, который представляет собой URI на произвольном домене. Для удобства использования operator-sdk можно сразу установить поддержку автодополнения в shell Debian/Ubuntu:
apt install bash-completion
echo 'source <(operator-sdk completion bash)' >>~/.bashrc
Operator SDK поддерживает создание операторов для разных DevOps-решений, включая Ansible и Helm, для этого при инициализации можно указывать необходимые плагины. Также инструмент командной строки позволяет выполнять развертывание оператора (operator-sdk olm
), запуск оператора в выбранном контексте (operator-sdk run
) и выполнять тесты (operator-sdk scorecard
). Также можно создавать фрагменты кода для реализации разных типов Kubernetes-ресурсов. Начнем с создания версии apiVersion:
operator-sdk create api --group aerospike --version v1alpha1 --kind AerospikeCluster --resource --controller
Генератор создает несколько файлов для взаимодействия с Kubernetes API:
api/v1alpha1/aerospikecluster_types.go
- описание типов (CRD);controllers/aerospikecluster_controller.go
- контроллер для создания и отслеживания типов CRD;/go.mod
- содержит зависимости для подключения к API (k8s.io/apimachinery - доступ к API для управления ресурсами, k8s.io/client-go - Go-клиент для взаимодействия с API, sigs.k8s.io/controller-runtime - базовые классы для реализации контроллера, github.com/onsi/ginkgo - тестовый фреймворк для проверки оператора).
Попробуем сгенерировать манифесты для Kubernetes, которые будут определять Custom Resource Definition и развертывания оператора: make manifests
. Результатом выполнения будет генерация файлов в crd/bases - регистрация CRD-ресурсов (apiextensions.k8s.io/v1/CustomResourceDefinition
) с названием aerospikeclusters.aerospike.dzolotov.tech
для создаваемого API (группа aerospike.my.domain, тип ресурса AerospikeCluster
). По умолчанию схема ресурса содержит строковое поле foo, далее мы переопределим схему в коде описания типов на Go:
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.8.0
name: aerospikeclusters.aerospike.my.domain
spec:
group: aerospike.dzolotov.tech
names:
kind: AerospikeCluster
listKind: AerospikeClusterList
plural: aerospikeclusters
singular: aerospikecluster
# будет локальным для namespace (может быть Cluster для создания ресурса на уровне кластера, без привязки к namespace)
scope: Namespaced
versions:
# создаваемая версия CRD
- name: v1alpha1
schema:
# описание схемы yaml-документа
openAPIV3Schema:
description: AerospikeCluster is the Schema for the aerospikeclusters API
properties:
# версия api (позволит делать разные версии для API)
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
type: string
# тип ресурса (в нашем случае будет AerospikeCluster)
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
type: string
# описание метаданных (.meta) - определяется по схеме metav1.ObjectMeta
metadata:
type: object
# описание спецификации кластера (.spec) - в соответствии со схемой
spec:
description: AerospikeClusterSpec defines the desired state of AerospikeCluster
properties:
# свойства спецификации кластера (.spec)
foo:
description: Foo is an example field of AerospikeCluster. Edit aerospikecluster_types.go
to remove/update
type: string
type: object
# назначение ключа объекта yaml .status (пустой объект)
status:
description: AerospikeClusterStatus defines the observed state of AerospikeCluster
type: object
type: object
# флаг актуальности версии CRD
served: true
# версия по умолчанию
storage: true
Примером корректного ресурса с использованием указанного CRD:
apiVersion: aerospike.my.domain/alpha1
kind: AerospikeCluster
metadata:
name: demo
spec:
foo: bar
status: {}
Прежде чем мы пойдем рассматривать код контроллера дополним схему данных и добавим следующие поля:
deployments (int) - количество серверов в кластере;
maxMemory (int) - ограничение по размеру памяти (в Гб);
maxCPU (string) - ограничение по использованию процессора;
logging (string) - уровень протоколирования;
compressionLevel (int) - уровень сжатия;
replicationFactor (int) - фактор репликации для кластера;
strongConsistency (bool) - включение режима строгой согласованности (в ущерб производительности);
-
heartbeat:
interval (int) - интервал для проверки доступности соседних узлов (в миллисекундах).
-
keepalive:
enabled (bool) - разрешено использование keepalive-сообщений;
interval (int) - интервал в секундах между сообщениями.
Для демонстрации такого набора будет достаточно, в реальном кластере разумеется нужно предусмотреть наличие всех необходимых опций для конфигурации узлов. Структура данных для спецификации кластера определяется в файле api/alphav1/aerospikecluster_types.go, сейчас там даны определения для всех создаваемых типов данных (кластер, список кластеров и связанные с ними типы):
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//ожидаемое состояние кластера (спецификация)
type AerospikeClusterSpec struct {
Foo string `json:"foo,omitempty"`
}
//текущее состояние кластера
type AerospikeClusterStatus struct {
}
//+kubebuilder:object:root=true
//+kubebuilder:subresource:status
type AerospikeCluster struct {
metav1.TypeMeta `json:",inline"`
//стандартное определение метаданных (namespace, name, annotations, ...)
metav1.ObjectMeta `json:"metadata,omitempty"`
//кластер содержит спецификацию и текущий статус
Spec AerospikeClusterSpec `json:"spec,omitempty"`
Status AerospikeClusterStatus `json:"status,omitempty"`
}
//+kubebuilder:object:root=true
type AerospikeClusterList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []AerospikeCluster `json:"items"`
}
//регистрация типов объектов и их схем
func init() {
SchemeBuilder.Register(&AerospikeCluster{}, &AerospikeClusterList{})
}
Добавим необходимые поля (и структуры) в структуру AerospikeClusterSpec и обозначим в статусе текущее состояние кластера: deployed (bool) - устанавливается в true когда развертывание завершено, brokenNodes (int) - количество недоступных узлов. Обновим определение типов и выполним повторно команду make в корневом каталоге проекта, это приведет к выполнению форматирования и проверки синтаксиса файлов.
type AerospikeHeartBeat struct {
// Heartbeat interval in milliseconds
Interval int `json:"interval,omitempty"`
}
type AerospikeKeepAlive struct {
// Keep alive is enabled
Enabled bool `json:"enabled,omitempty"`
// Keep alive messages interval in seconds
Interval int `json:"interval,omitempty"`
}
// ожидаемое состояние кластера (спецификация)
type AerospikeClusterSpec struct {
// Deployments count
Deployments int `json:"deployments,omitempty"`
// Maximum memory limit in GB (for example, 4)
MaxMemory *int `json:"maxMemory,omitempty"`
// Maximum cpu limit (100 = full power)
MaxCPU *int `json:"maxCPU,omitempty"`
// Logging level (https://docs.aerospike.com/reference/configuration)
Logging *string `json:"logging,omitempty"`
// Compression level (1 - lower compression, 9 - higher, but slower compression)
CompressionLevel *int `json:"compressionLevel,omitempty"`
// Min replicas count
ReplicationFactor int `json:"replicationFactor,omitempty"`
// Enable strong consistency mode
StrongConsistency bool `json:"strongConsistency,omitempty"`
// Heartbeat configuration
HeartBeat *AerospikeHeartBeat `json:"heartbeat,omitempty"`
// Keepalive configuration
Keepalive *AerospikeKeepAlive `json:"keepalive,omitempty"`
}
// текущее состояние кластера
type AerospikeClusterStatus struct {
// All nodes are prepared and ready
Deployed bool `json:"deployed"`
// How many nodes isn't available
BrokenNodes int `json:"brokenNodes"`
}
После применения make manifests можем убедиться, что описание создания CRD-ресурса соответствует измененной схеме (фрагмент приведен ниже):
compressionLevel:
description: Compression level (1 - lower compression, 9 - higher,
but slower compression)
type: integer
deployments:
description: Deployments count
type: integer
heartbeat:
description: Heartbeat configuration
properties:
interval:
description: Heartbeat interval in milliseconds
type: integer
type: object
При необходимости создать дополнительную версию существующего ресурса или создать новый можно использовать как инструмент operator-sdk create api, так и создавать дополнительные каталоги и файлы на основе существующего шаблона.
Дальше посмотрим на контроллер и тут ключевую роль играет Reconciler, который выполняет изменение конфигурации в соответствии с разностью между текущим и ожидаемым состоянием. Кроме это для проверки корректности согласованности спецификации может быть добавлено использование webhook и здесь нужно сделать небольшое введение в Dynamic Admission Control. Динамическое управление позволяет зарегистрировать два вида дополнительных обработчиков во время операций над ресурсами - ValidatingWebhook (проверяет корректность схемы создаваемого ресурса, позволяет отменить операцию при нарушении схемы) и MutatingWebhook (допускает возможность внесения изменений в ресурс перед развертыванием, например используется для встраивания sidecar-контейнеров в операторе управления резервным копированием Stash, который мы рассматривали в этой статье). Operator SDK позволяет также создавать и регистрировать webhook и даже генерировать заготовку ValidatingWebhook для проверки согласованности данных .
operator-sdk create webhook --group aerospike --version v1alpha1 --kind AerospikeCluster --defaulting --programmatic-validation
После вызова генератора создается файл aerospikecluster_webhook.go, в котором нужно будет реализовать функции проверки конфигурации при создании ValidateCreate(), при обновлении ValidateUpdate() и при удалении ValidateDelete(). Добавим дополнительную проверку, что количество реплик не превышает количество узлов и количество узлов является натуральным числом. В результате получим следующий код:
package v1alpha1
import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"strconv"
)
// log is for logging in this package.
var aerospikeclusterlog = logf.Log.WithName("aerospikecluster-resource")
func (r *AerospikeCluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(r).
Complete()
}
//+kubebuilder:webhook:path=/mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=maerospikecluster.kb.io,admissionReviewVersions=v1
var _ webhook.Defaulter = &AerospikeCluster{}
//здесь можно применить значения по умолчанию (выполняется до валидации)
func (r *AerospikeCluster) Default() {
aerospikeclusterlog.Info("default", "name", r.Name)
}
// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=create;update,versions=v1alpha1,name=vaerospikecluster.kb.io,admissionReviewVersions=v1
var _ webhook.Validator = &AerospikeCluster{}
func (r *AerospikeCluster) validateReplicas() error {
var allErrs field.ErrorList
if r.Spec.ReplicationFactor > r.Spec.Deployments {
fldPath := field.NewPath("spec").Child("replicationFactor")
allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Replication factor must be lower than deployments count"))
}
if r.Spec.Deployments < 1 {
fldPath := field.NewPath("spec").Child("deployments")
allErrs = append(allErrs, field.Invalid(fldPath, strconv.Itoa(r.Spec.ReplicationFactor), "Deployments counter must be greater than zero"))
}
if len(allErrs) == 0 {
return nil
}
return apierrors.NewInvalid(schema.GroupKind{Group: "aerospike.dzolotov.tech", Kind: "AerospikeCluster"}, r.Name, allErrs)
}
//проверка при создании нового ресурса
func (r *AerospikeCluster) ValidateCreate() error {
aerospikeclusterlog.Info("validate create", "name", r.Name)
return r.validateReplicas()
}
//проверка при обновлении ресурса
func (r *AerospikeCluster) ValidateUpdate(old runtime.Object) error {
aerospikeclusterlog.Info("validate update", "name", r.Name)
return r.validateReplicas()
}
//проверка при удалении ресурса (сейчас отключена в метаданных kubebuilder)
func (r *AerospikeCluster) ValidateDelete() error {
aerospikeclusterlog.Info("validate delete", "name", r.Name)
return nil
}
Здесь мы используем механизм apierrors, рекомендуемый Kubernetes для подробного описания ошибки и места ее возникновения, который позволяет объединить все ошибки в один объект, но можно было использовать и обычный объект ошибки. Обратите внимание на комментарии //+kubebuilder:webhook, они содержат метаданные, которые используются для генерации манифеста и здесь, например, можно изменить список валидаций (сейчас используются только create,update, но при необходимости можно сделать валидацию на удаление (например, так можно запретить удаление ресурса, но лучше такого не делать). После вызова make / make manifests посмотрим на сгенерированный файл манифеста для установки AdmissionWebhooks:
apiVersion: admissionregistration.k8s.io/v1
kind: MutatingWebhookConfiguration
metadata:
creationTimestamp: null
name: mutating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /mutate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster
failurePolicy: Fail
name: maerospikecluster.kb.io
rules:
- apiGroups:
- aerospike.dzolotov.tech
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- aerospikeclusters
sideEffects: None
---
apiVersion: admissionregistration.k8s.io/v1
kind: ValidatingWebhookConfiguration
metadata:
creationTimestamp: null
name: validating-webhook-configuration
webhooks:
- admissionReviewVersions:
- v1
clientConfig:
service:
name: webhook-service
namespace: system
path: /validate-aerospike-dzolotov-tech-v1alpha1-aerospikecluster
failurePolicy: Fail
name: vaerospikecluster.kb.io
rules:
- apiGroups:
- aerospike.dzolotov.tech
apiVersions:
- v1alpha1
operations:
- CREATE
- UPDATE
resources:
- aerospikeclusters
sideEffects: None
Здесь можно увидеть, что и MutatingWebhook и ValidatingWebhook применяются на ресурсы в apiVersion: aerospike.dzolotov.tech/v1alpha1 при операциях создания и обновления на ресурсах aerospikecluster. Теперь посмотрим на другие подкаталоги в config и поговорим о правах доступа (RBAC) и развертывании контроллера.
certmanager
- манифест для издания сертификата для валидации webhook (создается автоматически), привязывается к сервису контроллера;crd
- описание типов ресурсов (CRD), мы рассмотрели его выше, но добавлю что кроме наших определений есть еще патчи, которые позволяют пробросить созданный через certmanager сертификат в аннотации ресурса для использования в коде;default
- содержит патчи для контроллера, которые публикуют webhook-сервер и разворачивают system/controller-manager из gcr.io/kubebuilder/kube-rbac-proxy:v0.11.0;manifests
- конфигурация для kustomization, который создает манифесты при сборке финального артефакта для установки оператора;prometheus
- регистрация ServiceMonitor для экспорта метрик из controller-manager в prometheus;rbac
- набор прав для обеспечения доступа контроллера к управлению ресурсами aerospikeclusters, а также для публикации метрик, сервисной записи для выполнения controller-manager и др.;scorecard
- конфигурация для запуска тестов через OLM (Operator Lifecycle Manager, позволяет управлять запуском и тестирование операторов внутри существующего кластера kubernetes);webhook
- конфигурация для регистрации AdminissionWebhooks (рассмотрели ранее).
Коротко об RBAC (Role-Based Access Control) - это внутренний механизм контроля доступа Kubernetes, который определяет понятие Role (или ClusterRole, если без привязки к пространству имен), перечисляющее какие действия и над какими ресурсами могут быть выполнены и RoleBinding (ClusterRoleBinding) для связывания учетной записи пользователи или сервисной записи (ServiceAccount) с соответствующей ролью. Таким образом, чтобы оператор мог создавать новые ресурсы Deployment, требуется определить роль с разрешениями на create/update/patch/delete для ресурсов kind: Deployment в apiVersion: apps/v1 и связать ее с сервисной записью, под которой происходит развертывание оператора. Для управление нашим контроллером будет запущен процесс controller-manager, он также будет отслеживать доступность контроллера и перезапускать его при необходимости.
Вернемся теперь к контроллеру (controllers/aerospikecluster_controller.go) и посмотрим внимательно на его структуру:
package controllers
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1"
)
// Объект реконсилера для приведения кластера к ожидаемому состоянию
type AerospikeClusterReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// Описание разрешений (будут транслированы в роли)
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=aerospike.dzolotov.tech,resources=aerospikeclusters/finalizers,verbs=update
func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
//здесь будет наш код управления кластером
return ctrl.Result{}, nil
}
// Регистрация с использованием ControllerManager
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&aerospikev1alpha1.AerospikeCluster{}).
Complete(r)
}
Поскольку мы будем управлять развертыванием кластера серверов (будем использовать StatefulSet, поскольку нам важно сохранить связь экземпляра пода и его местоположения, также нам будет нужно иметь возможность создания и изменения ConfigMap для определения конфигурации сервера по параметрам из манифеста), то нам нужно будет сообщить контроллеру о том, что мы будем наблюдать за ресурсами StatefulSet и Pod и добавить соответствующие правила в комментариях +kubebuilder. Прежде всего добавим импорт структур, описывающих пространство имен apps/v1 и core/v1 (для ConfigMap):
import (
"context"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
aerospikev1alpha1 "github.com/operator-framework/operator-sdk/api/v1alpha1"
)
И сообщим менеджеру о том, что мы будем управлять объектами StatefulSet и ConfigMap:
func (r *AerospikeClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&aerospikev1alpha1.AerospikeCluster{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.ConfigMap{}).
Complete(r)
}
И добавим метаданные перед методом Reconcile для описания требуемых прав доступа:
//+kubebuilder:rbac:groups=apps/v1,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=apps/v1,resources=pods,verbs=get;list
//+kubebuilder:rbac:groups=v1,resources=configmaps,verbs=get;list;create;update;patch;delete
Сама реализация будет использовать публичные методы API Kubernetes, прежде всего для нас интересны методы:
r.List(ctx, objectList, opts)
- возвращает список объектов в objectList с отбором по opts (требует разрешения list на этот тип объектов в RBAC-роли), например:
podList := &v1.PodList{}
opts := []client.ListOption{
client.InNamespace(request.NamespacedName.Namespace),
client.MatchingLabels{"instance": request.NamespacedName.Name},
client.MatchingFields{"status.phase": "Running"},
}
err := r.List(ctx, podList, opts...)
Выбирает все запущенные поды с меткой instance и значением, совпадающих с именем кластера (поиск осуществляется в том же пространстве имен, где развернут ресурс AerospikeCluster).
r.Get(ctx, namespacedName, object)
- сохраняет объект в object, идентификацию объекта выполняет NamespacedName, включающий пространство имен Namespace и имя внутри него Name (требует разрешение get на этот тип объектов в RBAC-роли);
r.Create(ctx, object, opts)
- создает новый объект в кластере (требуется наличие разрешений create на этот тип объекта в RBAC-роли), например:
func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
//прочитаем ресурс описания кластера
cluster := &aerospikev1alpha1.AerospikeCluster{}
err := r.Get(ctx, req.NamespacedName, cluster)
if err != nil {
return ctrl.Result{}, err
}
//преобразование числовых полей в строки и в тип int32 для передачи в структуру
//описания StatefulSet
var deployments int32
deployments = int32(cluster.Spec.Deployments)
var replicationFactor string
replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
var maxMemory string
maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)
var maxCpu string
maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)
//описание нового объекта StatefulSet
sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: req.NamespacedName.Namespace, Name: req.NamespacedName.Name}, Spec: appsv1.StatefulSetSpec{
Replicas: &deployments,
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{"label": req.Name},
},
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "aerospike",
Image: "aerospike",
Args: nil,
EnvFrom: nil,
Env: []corev1.EnvVar{
{
Name: "REPL_FACTOR",
Value replicationFactor,
},
{
Name: "MEM_GB",
Value: maxMemory,
},
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(maxCpu),
},
},
},
},
},
},
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{},
}}
r.Create(ctx, &sts)
return ctrl.Result{Requeue: true}, nil
}
r.Update(ctx, object, opts)
- обновление конфигурации объекта (для StatefulSet например можно изменить переменные окружения и количество реплик), требуется разрешение update на этот тип ресурсов в RBAC-роли;r.Patch(ctx, object, patch)
- внесение исправлений в структуру, описывающую запущенный объект;r.Delete(ctx, objects, opts)
- удаление объекта из кластера (требуется разрешение delete на этот тип ресурсов в RBAC-роли);r.Status().Update(ctx, cluster)
- дает доступ к модификации объекта состояния (виден в манифесте и в объекте cluster как .status).
Поведение reconciler в нашем случае может быть описано несколькими возможными сценариями:
объект кластера существует, но нет statefulset - создаем новый statefulset и configmap;
объект кластера существует и есть statefulset - обновляем параметры конфигурации (лимиты, переменные окружения, количество реплик) и пересоздаем configmap;
объект кластера не существует - удаляем statefulset и configmap (игнорируем ошибки).
В целом код можно разделить на несколько частей: генерация и обновление configmap, обработка удаления кластера, обновление конфигурации существующего кластера. Код может выглядеть таким образом. Обновление или создание configmap:
func (r *AerospikeClusterReconciler) generateConfigMap(ctx context.Context, cluster *aerospikev1alpha1.AerospikeCluster) error {
//проверим на наличие существующего configmap
var cm corev1.ConfigMap
err := r.Get(ctx, types.NamespacedName{
Namespace: cluster.Namespace,
Name: cluster.Name,
}, &cm)
if err != nil {
if errors.IsNotFound(err) {
//создаем новый configmap
newConfigMap := corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Namespace: cluster.Namespace,
Name: cluster.Name,
},
Data: map[string]string{
"aerospike": createConfigMap(cluster),
},
}
err := r.Create(ctx, &newConfigMap)
return err
} else {
return err
}
}
//если объект уже существует, то изменяем данные
cm.Data = map[string]string{
"aerospike": createConfigMap(cluster),
}
err = r.Update(ctx, &cm)
return err
}
func createConfigMap(cluster *aerospikev1alpha1.AerospikeCluster) string {
result := make([]string, 0, 0)
if cluster.Spec.Logging != nil {
result = append(result, "logging "+*cluster.Spec.Logging)
}
result = append(result, "strong-consistency "+strconv.FormatBool(cluster.Spec.StrongConsistency))
if cluster.Spec.CompressionLevel != nil {
result = append(result, "compression-level "+strconv.Itoa(*cluster.Spec.CompressionLevel))
}
result = append(result, "interval "+strconv.Itoa(cluster.Spec.HeartBeat.Interval))
if cluster.Spec.Keepalive != nil {
result = append(result, "keepalive-enabled "+strconv.FormatBool(cluster.Spec.Keepalive.Enabled))
result = append(result, "keepalive-intvl "+strconv.Itoa(cluster.Spec.Keepalive.Interval))
}
return strings.Join(result, "\n")
}
Подготовка объекта StatefulSet по описанию кластера:
func (r *AerospikeClusterReconciler) getConfiguration(cluster *aerospikev1alpha1.AerospikeCluster) *appsv1.StatefulSet {
var deployments int32
deployments = int32(cluster.Spec.Deployments)
var replicationFactor string
replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
var maxMemory string
maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)
var maxCpu string
maxCpu = strconv.Itoa(*cluster.Spec.MaxCPU)
sts := appsv1.StatefulSet{ObjectMeta: ctrl.ObjectMeta{Namespace: cluster.Namespace, Name: cluster.Name}, Spec: appsv1.StatefulSetSpec{
Replicas: &deployments,
Selector: &v1.LabelSelector{
MatchLabels: map[string]string{"instance": cluster.Name},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{"instance": cluster.Name},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "aerospike",
Image: "aerospike",
Args: nil,
EnvFrom: nil,
VolumeMounts: []corev1.VolumeMount{
{
Name: "config",
MountPath: "/etc/aerospike/",
},
},
Env: []corev1.EnvVar{
{
Name: "REPL_FACTOR",
Value: replicationFactor,
},
{
Name: "MEM_GB",
Value: maxMemory,
},
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(maxCpu),
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "config",
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{Name: cluster.Name},
Items: []corev1.KeyToPath{
{
Key: "aerospike",
Path: "aerospike.conf",
},
},
},
},
},
},
},
},
UpdateStrategy: appsv1.StatefulSetUpdateStrategy{},
}}
return &sts
}
Логика удаления, создания и обновления ресурсов кластера и его состояния:
func (r *AerospikeClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx) //можно сохранить в logger и использовать для сообщений об ошибках или диагностики
cluster := &aerospikev1alpha1.AerospikeCluster{}
err := r.Get(ctx, req.NamespacedName, cluster)
if err != nil {
if errors.IsNotFound(err) {
//кластер был удален, удалим развертывание
sts := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, sts)
if err == nil {
err = r.Delete(ctx, sts)
if err == nil {
cm := &corev1.ConfigMap{}
err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, cm)
}
}
}
//и завершаем reconfile успешно (или с ошибкой)
return ctrl.Result{}, err
}
// Check if the deployment already exists, if not create a new deployment.
found := &appsv1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{Name: cluster.Name, Namespace: cluster.Namespace}, found)
if err != nil {
if errors.IsNotFound(err) {
// существующего развертывания нет - создаем новое
err2 := r.generateConfigMap(ctx, cluster) //создаем configmap
if err2 != nil {
return ctrl.Result{}, err
}
dep := r.getConfiguration(cluster)
if err = r.Create(ctx, dep); err != nil {
return ctrl.Result{}, err
}
cluster.Status.BrokenNodes = 0
cluster.Status.Deployed = true
r.Status().Update(ctx, cluster)
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
}
deployments := int32(cluster.Spec.Deployments)
//здесь можно сравнить текущий и предыдущий размер кластера и изменить конфигурацию, если требуется
//но у нас могут поменяться обновременно и переменные окружения и configmap
found.Spec.Replicas = &deployments
var replicationFactor string
replicationFactor = strconv.Itoa(cluster.Spec.ReplicationFactor)
var maxMemory string
maxMemory = strconv.Itoa(*cluster.Spec.MaxMemory)
found.Spec.Template.Spec.Containers[0].Env = []corev1.EnvVar{
{
Name: "REPL_FACTOR",
Value: replicationFactor,
},
{
Name: "MEM_GB",
Value: maxMemory,
},
}
found.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().Set(int64(*cluster.Spec.MaxCPU))
err = r.generateConfigMap(ctx, cluster)
if err != nil {
return ctrl.Result{}, err
}
// получим активное количество подов и обновим BrokenNodes в состоянии
podList := &corev1.PodList{}
listOpts := []client.ListOption{
client.InNamespace(cluster.Namespace),
client.MatchingLabels{"instance": cluster.Name},
client.MatchingFields{"status.phase": "Running"},
}
if err = r.List(ctx, podList, listOpts...); err != nil {
return ctrl.Result{}, err
}
// обновим количество неактивных узлов
cluster.Status.BrokenNodes = cluster.Spec.Deployments - len(podList.Items)
err2 := r.Status().Update(ctx, cluster)
return ctrl.Result{}, err2
}
Для применения созданного оператора можно использовать make deploy
в корневом каталоге проекта или через OLM:
operator-sdk olm install
make bundle IMG="dzolotov.tech/aerospike-operator:v0.0.1"
make bundle-build bundle-push BUNDLE_IMG="dzolotov.tech/aerospike-operator-bundle:v0.0.1"
operator-sdk run bundle dzolotov.tech/aerospike-operator-bundle:v0.0.1
Здесь будет собран образ контейнера (его можно отправить, например, в Docker hub) и запущен оператор на текущем кластере Kubernetes. Для остановки оператора применяются действия make undeploy
или operator-sdk cleanup aerospike-operator
(для OLM).
Проверить работу оператора можно через создание ресурса, который был определен ранее:
cat <<< EOF | kubectl apply -f -
apiVersion: dzolotov.tech/v1alpha1
kind: AerospikeCluster
metadata:
namespace: cluster
name: demo
spec:
deployments: 2
maxMemory: 2
compressionLevel: 4
replicationFactor: 1
strongConsistency: false
heartbeat:
interval: 10000
EOF
С использованием операторов возможно реализовать сложную логику по управлению распределенными системами или обеспечить возможность конфигурирования разворачиваемых приложений в терминах предметной области. Большое количество операторов можно найти на https://operatorhub.io/, а документацию и статьи по разработке в этой коллекции ссылок.
Также хочу пригласить всех заинтересованных на бесплатный вебинар, в рамках которого мы комплексно рассмотрим основные векторы по обеспечению безопасности kubernetes кластера и подробно остановимся на каждом из них. Затронем тему безопасности docker-образов, безопасность в рантайме, network и application security.