Вступление

Всем привет! Я стал замечать на себе, что чаще решаю задачи, связанные с эксплуатацией инфраструктуры (Ops), чем с разработкой. Это видно по моим статьям — они в основном посвящены настройке и развертыванию различных инструментов. Однако «Dev»-составляющая тоже важна, и поэтому я решил прокачиваться еще и в разработке!

В этой статье я хочу поделиться опытом создания Kubernetes-оператора на Go. Этот язык часто используют для подобных задач, а сама идея родилась из практической потребности — автоматизировать отслеживание изменений в кластере и отправку уведомлений в Telegram. Сначала мы разберемся со структурой оператора, затем опишем кастомные ресурсы (Custom Resources), реализуем для них контроллеры и наконец развернем готовый оператор в тестовом кластере. Приступим!

Оглавление

Оглавление

Критика - это хорошо!

Автор не является Go-разработчиком, поэтому если вы заметили недочеты в коде или терминологии, то комментарии (или ЛС) всегда открыты. Спасибо за обратную связь!

Что такое операторы?

Kubernetes из коробки предлагает достаточный объем инструментов для развертывания сервисов. С их помощью можно покрыть почти что все потребности.

Но иногда встроенных возможностей куба не хватает. Тогда на помощь приходят операторы. Например, возьмем CloudNativePG (CNPG), один из операторов для развертывания PostgreSQL в Kubernetes. Установить его можно через kubectl или helm. Так оператор берет на себя конфигурирование и поддержку СУБД, включая хранение настройку, бэкапирование, репликацию и fail-over. Вам необходимо лишь описать в YAML формате ресурс c необходимыми параметрами.

При установке оператора в ваш кластер так же добавляются Custom Resource Definition. Список кастомных ресурсов, которые предоставляет CNPG, можно найти тут.

Рассмотрим в качестве примере Cluster. Именно тут описываются основные параметры инстанса СУБД - кол-во реплик, данные пользователей, настройки для конкретной базы данных и прочее. Вот так это выглядит:

apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
 name: postgresql
 namespace: default
spec:
 instances: 3
 imageName: ghcr.io/cloudnative-pg/postgresql:16.2-14
 storage:
   pvcTemplate:
     resources:
       requests:
         storage: 5Gi
 resources:
   requests:
     memory: "1Gi"
     cpu: 1
   limits:
     memory: "1Gi"
     cpu: 1

После применения манифеста оператор развернет три пода (один мастер, два слейва), сервисы для подключения, секреты с паролями, а так же сообщит, что кластер находится в запущенном состоянии:

$ kubectl get Cluster        
NAME         AGE    INSTANCES   READY   STATUS                     PRIMARY
postgresql   119s   3           3       Cluster in healthy state   postgresql-1

$ kubectl get pods   
NAME           READY   STATUS    RESTARTS   AGE
postgresql-1   1/1     Running   0          71s
postgresql-2   1/1     Running   0          50s
postgresql-3   1/1     Running   0          38s

PostgreSQL готов к использованию!

Идея собственного оператора

Функционал оператора в одном gif
Функционал оператора в одном gif

В этой статье я напишу оператор, который будет отслеживать определенные ресурсы Kubernetes, а затем отправлять о них информацию в Telegram-чат. Для этого я разверну базовый шаблон оператора на Golang через kubebuilder, опишу какие кастомные ресурсы для своего оператора, а затем разверну в кластере k8s. Погнали!

Ссылка на репо с исходным кодом: тут.

Инициализация оператора

Для создания оператора я воспользуюсь kubebuilder. После установки утилиты инициализируем оператор:

$ mkdir k8s-resource-tracker-operator
$ cd ./k8s-resource-tracker-operator
$ kubebuilder init --domain azamaton.ru --repo github.com/AzamatKomaev/k8s-resource-tracker-operator

В текущей директорий должна была появиться следующая структура:

  • cmd/main.go — точка запуска оператора. Тут описан пакет main и функция main.

  • api/ — директория, которая содержит Custom Resource Definitions (CRDs) и соответствующие им типы на Go.

  • config/default — директория с kustomization.yaml, который используется для установки оператора в кластер.

  • config/manager — директория с ресурсом Deployment для запуска оператора.

  • config/network‑policy — директория с ресурсами NetworkPolicy. По‑умолчанию содержит разрешение на получение метрик оператора из того же namespace.

  • config/prometheus — директория с ресурсами ServiceMonitor. Если у вас установлен Prometheus, то метрики из оператора автоматически будут скрапиться прометеусом.

  • config/rbac — директория с ресурсами для Kubernetes RBAC (SA, Role, RoleBinding, etc), необходимые оператору.

  • bin/ — это директория содержит исполняемые файлы, например kustomize.

  • test/ — директория с e2e тестами.

  • internal/ — директория, содержащая имплементацию контроллеров для определенных кастомных ресурсов (CR).

А теперь приступим к описанию Custom Resource Definition!

Описание структуры для ContactPoint

CRD (Custom Resource Definition) служит для создания своих собственных ресурсов. У моего оператора будут два ресурса: ContactPoint и TrackedField. С помощью ContactPoint будет возможность задавать место, куда будут приходить уведомления об изменений поля. TrackedField же, в свою очередь, ресурс и поле, за которым необходимо следить и уведомлять в чат. Начнем с ContactPoint.

Для начала с помощью команды kubebuilder create api создадим данный ресурс:

$ kubebuilder create api --group tg --version v1 --kind Conta
INFO Create Resource [y/n]                        
y
INFO Create Controller [y/n]                      
y
INFO Writing kustomize manifests for you to edit... 
INFO Writing scaffold for you to edit...          
INFO api/v1/contactpoint_types.go                 
INFO api/v1/groupversion_info.go                  
INFO internal/controller/suite_test.go            
INFO internal/controller/contactpoint_controller.go 
INFO internal/controller/contactpoint_controller_test.go 
<...>

Создаем и ресурс, и контроллер. kubebuilder добавил базовую заготовку для нашего ресурса в директорию api/v1/, а так же имплементацию контроллера для этого ресурса в internal/controller. Взгялнем на содержимое contactpoint_types.go:

api/v1/contactpoint_types.go
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// ContactPointSpec defines the desired state of ContactPoint.
type ContactPointSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file

	// Foo is an example field of ContactPoint. Edit contactpoint_types.go to remove/update
	Foo string `json:"foo,omitempty"`
}

// ContactPointStatus defines the observed state of ContactPoint.
type ContactPointStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// ContactPoint is the Schema for the contactpoints API.
type ContactPoint struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   ContactPointSpec   `json:"spec,omitempty"`
	Status ContactPointStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// ContactPointList contains a list of ContactPoint.
type ContactPointList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []ContactPoint `json:"items"`
}

func init() {
	SchemeBuilder.Register(&ContactPoint{}, &ContactPointList{})
}

У каждого ресурса в K8s должны быть поля apiVersion, kind, metadata, spec и status. Начнем описывать свой собственный ресурс. Для начала укажем типы ContactPoint-ов, куда у нас будет возможность отправлять уведомления. Для начала хватит Telegram и Webhook

type ContactPointType string

const (
	TelegramType ContactPointType = "Telegram"
	WebhookType  ContactPointType = "Webhook"
)

Для каждого из типов CP (ContactPoint) будет своя спецификация. Для Telegram мы должны будем указать Chat ID, тогда как для Webhook ссылку и название заголовка, который следует передавать при запросе.

type ContactPointTelegramSpec struct {
	ChatId int `json:"chatId"`
}

type ContactPointWebhookSpec struct {
	Url        string `json:"url"`
	HeaderName string `json:"headerName"`
}

Для отправки сообщения нам нужно передавать еще и api token. Хранить его в коде плохая практика, поэтому брать токен будем из ресурса типа Secret:

type ContactPointApiToken struct {
	SecretName string `json:"secretName"`
	Key        string `json:"key"`
}

Теперь опишем полную спецификаю ресурса:

type ContactPointSpec struct {
	Type         ContactPointType         `json:"type"`
	ApiToken     ContactPointApiToken     `json:"apiToken"`
	WebhookSpec  ContactPointWebhookSpec  `json:"webhookSpec,omitempty"`
	TelegramSpec ContactPointTelegramSpec `json:"telegramSpec,omitempty"`
}

Для полей WebhookSpec и TelegramSpec в параметрах передается значение omitempty, которое указывает, что поле опционально.

Теперь опишем поле для статуса. У нас будет два булевых значения. Initialized будет устанавливаться на true при самом первом запуске функции Reconcile (об этом дальше). Ready будет указывать на то, что ContactPoint готов отправлять уведомления в заданный чат. По-умолчанию оба значения равны false

type ContactPointStatus struct {
	Initialized bool `json:"initialized"`
	Ready       bool `json:"ready"`
}

И наконец весь ресурс со всеми полями. Обратите внимание на комментарии, которые начинаются на +kubebuilder:. Они служат для того, чтобы kubebuilder понимал, что type ContactPoint является корневым ресурсом:

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// ContactPoint is the Schema for the contactpoints API.
type ContactPoint struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   ContactPointSpec   `json:"spec,omitempty"`
	Status ContactPointStatus `json:"status,omitempty"`
}

Последняя структура необходима для представления списка ресурсов, получаемых при операции list:

// +kubebuilder:object:root=true

// ContactPointList contains a list of ContactPoint.
type ContactPointList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []ContactPoint `json:"items"`
}

Структуры готовы, теперь осталось сгенерировать CRD:

$ make manifests
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases

В папке config/crd/bases/ появился файл tg.azamaton.ru_contactpoints.yaml, содержащий ресурс CRD. Он был сгенерирован на основе описанных выше Go-структур:

config/crd/bases/tg.azamaton.ru_contactpoints.yaml
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.16.4
  name: contactpoints.tg.azamaton.ru
spec:
  group: tg.azamaton.ru
  names:
    kind: ContactPoint
    listKind: ContactPointList
    plural: contactpoints
    singular: contactpoint
  scope: Namespaced
  versions:
  - name: v1
    schema:
      openAPIV3Schema:
        description: ContactPoint is the Schema for the contactpoints API.
        properties:
          apiVersion:
            description: |-
              APIVersion defines the versioned schema of this representation of an object.
              Servers should convert recognized schemas to the latest internal value, and
              may reject unrecognized values.
              More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
            type: string
          kind:
            description: |-
              Kind is a string value representing the REST resource this object represents.
              Servers may infer this from the endpoint the client submits requests to.
              Cannot be updated.
              In CamelCase.
              More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
            type: string
          metadata:
            type: object
          spec:
            properties:
              apiToken:
                properties:
                  key:
                    type: string
                  secretName:
                    type: string
                required:
                - key
                - secretName
                type: object
              telegramSpec:
                properties:
                  chatId:
                    type: integer
                required:
                - chatId
                type: object
              type:
                type: string
              webhookSpec:
                properties:
                  headerName:
                    type: string
                  url:
                    type: string
                required:
                - headerName
                - url
                type: object
            required:
            - apiToken
            - type
            type: object
          status:
            properties:
              initialized:
                type: boolean
              ready:
                type: boolean
            required:
            - initialized
            - ready
            type: object
        type: object
    served: true
    storage: true
    subresources:
      status: {}

Теперь добавим CRD в наш кластер. Если выполнять команду make install впервые, то kubebuilder установит в папку bin/ утилиту kustomize:

$ make install
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
Downloading sigs.k8s.io/kustomize/kustomize/v5@v5.5.0
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/contactpoints.tg.azamaton.ru created

Проверим, что CRD добавился:

$ kubectl get CustomResourceDefinition/contactpoints.tg.azamaton.ru
NAME                           CREATED AT
contactpoints.tg.azamaton.ru   2025-01-31T14:11:35Z

Реализация контроллера для ContactPoint

Теперь займемся логикой. Сейчас код контроллера для созданного ресурса выглядит следующим образом:

internal/controller/contactpoint_controller.go
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
	"context"

	"k8s.io/apimachinery/pkg/runtime"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	tgv1 "github.com/AzamatKomaev/k8s-resource-tracker-operator/api/v1"
)

// ContactPointReconciler reconciles a ContactPoint object
type ContactPointReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the ContactPoint object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	_ = log.FromContext(ctx)

	// TODO(user): your logic here

	return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ContactPointReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&tgv1.ContactPoint{}).
		Named("contactpoint").
		Complete(r)
}

Больше всего тут нас интересует функция Reconcile. Именно в ней мы будем синхронизировать текущее состояние кастомного ресурса с желаемым.

Начнем с комментариев над фукнцией. С помощью комментария +kubebuilder:rbac мы можем указать какие RBAC роли нужны будут нашему оператору. По-умолчанию указаны полные права на взаимодействие с ресурсом ContactPoint. Дополнительно добавим роль для получения секретов, это необходимо для получения API-токена:

// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch

Функция Reconcile принимает два аргумента: ctx context.Context и req ctrl.Request . Context необходим для различных API действий, например для отмены запроса или его переноса. Request же содежит информацию о текущем ресурсе.

Начнем с простого: получение кастомного ресурса. Получим log чтобы выводить необходимую информацию в консоль и создадим переменную для будущего кастомного ресурса:

func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  log := log.FromContext(ctx)
  
  var contactPoint tgv1.ContactPoint
  // <...>
}

У объекта Reconciler есть метод Get, в который нужно передать текущий контекст, объект NamespacedName и ссылку на ресурс. Про контекст я уже рассказывал чуть выше. NamespacedName же содержит два поля: Name и Namespace. Данных полей достаточно, чтобы найти ресурс в нашем кластере. Для кастомного ресурса мы будем брать его из объекта ctrl.Request:

  // <...>
  if err := r.Get(ctx, req.NamespacedName, &contactPoint); err != nil {
      log.V(1).Info("ContactPoint was deleted")
      return ctrl.Result{}, client.IgnoreNotFound(err)
  }
}

Сразу же проверяем err в блоке if. Если ресурс не был найден, то это означает, что его удалили. Поэтому в логах выводим соотвествующую информацию и возвращаем пустой объект Result и ошибку, переданную в client.IgnoreNotFound. Если ошибка NotFound, то IgnoreNotFound как раз вернет nil, иначе саму ошибку.

Если же ресурс был найден, то выведем в логи namespace ресурса и его тип из spec:

  // <...>
  log.V(1).Info("Namespace: " + contactPoint.Namespace)
  log.V(1).Info("Type: " + string(contactPoint.Spec.Type))

  return ctrl.Result{}, nil
}

Сейчас функция выглядит так:

internal/controller/contactpoint_controller.go
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch

func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := log.FromContext(ctx)

	var contactPoint tgv1.ContactPoint

	if err := r.Get(ctx, req.NamespacedName, &contactPoint); err != nil {
		log.V(1).Info("ContactPoint was deleted")
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}

	log.V(1).Info("Namespace: " + contactPoint.Namespace)
	log.V(1).Info("Type: " + string(contactPoint.Spec.Type))

	return ctrl.Result{}, nil
}

Время для запуска нашего оператора!

Первый запуск оператора

Для начала давайте опишем сам кастомный ресурс в файле config/samples/tg_v1_contactpoint.yaml :

apiVersion: tg.azamaton.ru/v1
kind: ContactPoint
metadata:
  labels:
    app.kubernetes.io/name: habr-k8s-resource-tracker
    app.kubernetes.io/managed-by: kustomize
  name: telegram-cp
  namespace: default
spec:
  type: Telegram
  telegramSpec:
    chatId: 12345678
  apiToken:
    secretName: telegram-cp-token
    key: token

Теперь соберем Docker образ и запушим его в DockerHub:

$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.0 

Начнется процесс сборки образа, а затем его загрузки в реестр. Дождемся завершения и наконец задеплоим оператор в Kubernetes кластер:

$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.0

Проверим, что Deployment перешел в статус Running:

$ kubectl -n habr-k8s-resource-tracker-system get pods
NAME                                                           READY   STATUS    RESTARTS   AGE
habr-k8s-resource-tracker-controller-manager-767b8695d-wcds9   1/1     Running   0          3m49s

Теперь задеплоим выше описанный ContactPoint:

$ kubectl apply -f ./config/samples/tg_v1_contactpoint.yaml 
contactpoint.tg.azamaton.ru/telegram-cp created

Проверим логи оператора:

<...>
2025-02-05T12:38:52Z    DEBUG   Namespace: default      {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "6a4d4a49-298c-4366-8fea-df63753e7a21"}
2025-02-05T12:38:52Z    DEBUG   Type: Telegram  {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "6a4d4a49-298c-4366-8fea-df63753e7a21"}

Теперь удалим ресурс и проверим логи снова:

2025-02-05T12:44:44Z    DEBUG   ContactPoint was deleted        {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "4773e7c6-9994-490c-a940-828b5bbda1f5"}

Реализация отправки уведомления в чат

Создадим директорию internal/alert. Тут же опишем файл types.go:

package alert

type ContactPoint interface {
	SendAlert(string) (interface{}, error)
}

type TelegramSendMessageBody struct {
	Text   string `json:"text"`
	ChatID int    `json:"chat_id"`
}

type TelegramSendMessageResponse struct {
	Ok     bool              `json:"ok"`
	Result SendMessageResult `json:"result"`
}

type SendMessageResult struct {
	MessageID int         `json:"message_id"`
	From      interface{} `json:"from"`
	Chat      interface{} `json:"chat"`
	Date      int64       `json:"date"`
	Text      string      `json:"text"`
}

Тут опишем интерфейс ContactPoint с функцией SendAlert(string), а также структуры, необходимые при работе с уведомлениями.

Реализуем TelegramContactPoint в файле internal/alert/telegram.go. Опишем константу с URL Telegram API и структуру TelegramContactPoint. Данная структура будет реализовывать интерфейс ContactPoint:

const TELEGRAM_BASE_API_URL = "https://api.telegram.org/bot%s"

type TelegramContactPoint struct {
	ChatID   int
	APIToken string
}

Реализуем метод SendAlert. Он принимает на вход строку message, остальные параметры (айди чата и токен бота) будут передаваться через поля структуры. Возвращает же функция ответ запроса и ошибку (при ее возникновений).

func (cp *TelegramContactPoint) SendAlert(message string) (interface{}, error) {
	if cp == nil {
		return nil, errors.New("TelegramContactPoint receiver is nil")
	}

	telegramAPIUrl := fmt.Sprintf(TELEGRAM_BASE_API_URL+"/sendMessage", cp.APIToken)
	body, err := json.Marshal(TelegramSendMessageBody{Text: message, ChatID: cp.ChatID})

	if err != nil {
		return nil, errors.New("failed to marshal request body")
	}

	req, err := http.NewRequest("POST", telegramAPIUrl, bytes.NewBuffer(body))
	if err != nil {
		return nil, errors.New("failed to create HTTP request")
	}
	req.Header.Set("Content-Type", "application/json")

	client := &http.Client{}
	resp, err := client.Do(req)

	if err != nil || resp.StatusCode != 200 {
		fmt.Println("Status code is " + strconv.Itoa(resp.StatusCode))
		return nil, errors.New("HTTP request failed")
	}

	defer resp.Body.Close()

	responseBody, err := io.ReadAll(resp.Body)

	if err != nil {
		return nil, errors.New("failed to read response body")
	}

	sendMessageResponse := TelegramSendMessageResponse{}
	err = json.Unmarshal(responseBody, &sendMessageResponse)
	if err != nil {
		return nil, errors.New("failed to unmarshal response")
	}

	return sendMessageResponse, nil
}

Дополнительно создадим структуру еще и для отправки уведомлении через вебхук в файле internal/alert/webhook.go. Реализовывать ее не будем, добавим ее лишь для того, чтобы продемонстрировать возможность добавлять новые типы для Contact point-ов:

type WebhookContactPoint struct {
	URL        string
	APIToken   string
	HeaderName string
}

func (cp *WebhookContactPoint) SendAlert(message string) (interface{}, error) {
	return nil, errors.New("webhook alerting is not available yet")
}

Так как ContactPointType у нас выступает отдельным типом (хоть и строкой), то добавим функцию для получения ContactPoint по данному значению. Это нужно для удобства и принципа DRY, так как использовать этот функционал мы будем в нескольких местах.

Заведем файл internal/alert/contactpoint.go со следующим содержимым:

func GetContactPointByType(cp v1.ContactPoint, apiToken string) ContactPoint {
	var contactPointService ContactPoint

	contactPointService = &TelegramContactPoint{
		ChatID:   cp.Spec.TelegramSpec.ChatId,
		APIToken: apiToken,
	}

	if cp.Spec.Type == tgv1.WebhookType {
		contactPointService = &WebhookContactPoint{
			URL:        cp.Spec.WebhookSpec.Url,
			APIToken:   apiToken,
			HeaderName: cp.Spec.WebhookSpec.HeaderName,
		}
	}

	return contactPointService
}

Логика проста: получаем на вход объект ContactPoint и на основе cp.Spec.Type выбираем нужный ContactPoint. По умолчанию возвращается TelegramContactPoint.

По логике отправки уведомлении мы закончили. Вернемся к контроллеру.

Допиливание контроллера для ContactPoint

Еще не забыли что мы добавили в функцию Reconcile? Вспоминайте, потому что сейчас мы будем добавлять новый код =)

Добавим переменную с типом Secret, в которой как раз будет храниться API Token. Сразу же добавим и targetSecretLocation с типом NamespacedName, который используется для поиска ресурса по его названию и пространству окружения. Название мы будем брать из spec-а нашего кастомного ресурса ContactPoint, а пространство имен укажем то же самое, где у нас деплоится оператор:

func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
  // <...>
  var secretWithAPIToken v1.Secret
  
  targetSecretLocation := types.NamespacedName{
      Name:      contactPoint.Spec.ApiToken.SecretName,
      Namespace: "habr-k8s-resource-tracker-system",
  }
  
  return ctrl.Result{}, nil
}

Через уже знакомый метод r.Get получим секрет. Если он не будет найден, то вернем ошибку с информацией о том, что секрет с указанным названием найти не получилось:

if err := r.Get(ctx, targetSecretLocation, &secretWithAPIToken); err != nil {
    log.Error(err, "unable to get secret with api token by spec")
    return ctrl.Result{}, client.IgnoreNotFound(err)
}

Далее проверим значение contactPoint.Status.Ready. Так как функция Reconcile должна быть индопонентна, то мы при каждом ее вызове необходимо проверять статус ресурса. Если значение false, то сначала получим соотвествующий типу ContactPoint, а затем вызовем метод SendAlert. Если метод вернет ошибку, то статус останется false. Если же err == nil, то назначим статус готовности true. Значение для поля Initialized в любом случае установится true:

if !contactPoint.Status.Ready {
    var contactPointService alert.ContactPoint

    contactPointService = alert.GetContactPointByType(contactPoint,
        string(secretWithAPIToken.Data[contactPoint.Spec.ApiToken.Key]))

    _, err := contactPointService.SendAlert("Contact point is ready")

    if err != nil {
        contactPoint.Status = tgv1.ContactPointStatus{Ready: false, Initialized: true}
        log.Error(err, "send alert return error: "+err.Error())
    } else {
        contactPoint.Status = tgv1.ContactPointStatus{Ready: true, Initialized: true}
    }
}

Еще обратите внимание на то, каким образом передается APIToken в метод GetContactPointByType. У экземпляра Secret есть поле Data, оно представлено типом данным map. Отсюда по ключу мы можем вытащить любой нужное значение.

После блока if обновим ресурс при помощи метода r.Status().Update. В качестве аргументов передаем уже знакомый нам контекст и указатель на измененный ресурс:

if err := r.Status().Update(ctx, &contactPoint); err != nil {
    log.Error(err, "unable to update ContactPoint status")
    return ctrl.Result{}, err
}

Теперь давайте тестить!

Тестирование готового контроллера для ContactPoint

Соберем и задеплоим новую версию оператора в кластер:

$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.1
$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.1

Для начала добавим ContactPoint с несуществующим секретом и случайным chatId:

apiVersion: tg.azamaton.ru/v1
kind: ContactPoint
metadata:
  labels:
    app.kubernetes.io/name: habr-k8s-resource-tracker
    app.kubernetes.io/managed-by: kustomize
  name: telegram-cp
  namespace: default
spec:
  type: Telegram
  telegramSpec:
    chatId: 12345678
  apiToken:
    secretName: telegram-cp-token
    key: token

Проверим логи:

 ERROR   unable to get secret with api token by spec     {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "9ab6bcba-f4d7-4e0e-83ff-cc5e1864a6bc", "error": "Secret \"telegram-cp-token\" not found"}

Проверка на наличие секрета сработала. Теперь добавим сам секрет (с актуальным токеном) и пересоздадим ресурс:

 $ kubectl -n habr-k8s-resource-tracker-system create secret generic telegram-cp-token \ 
        --from-literal=token="12345678:ABCDEFUABCDEFUABCDEFUABCDEFU"
secret/telegram-cp-token created

Проверим логи снова:

ERROR   send alert return error: HTTP request failed    {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "dcca6b74-17ca-48ce-a6af-6020e9b99eb3", "error": "HTTP request failed"}

Теперь проблема с HTTP запросом. А все потому что у нас нет прав отправлять сообщения в чат с ID 12345678. Изменим ID чата на тот, куда бот имеет право отправить алерт. Сразу же после изменения прилетит сообщение в указанный чат, что Contact Point готов. При этом в логах будет пусто:

Проверим Status ресурса:

$ kubectl get ContactPoint/telegram-cp -o json | jq .status   
{
  "initialized": true,
  "ready": true
}

Contact Point готов!

Реализация контроллера для TrackedField

Теперь, когда контроллер для ContactPoint работает как нужно, мы можем приступить к реализации нового кастомного ресурса: TrackedField.

По аналогии с типом contact point, мы заложим возможность добавлять новые ресурсы и поля для отслеживания. Но в данной статье мы ограничимся лишь полем replicas у ресурса Deployment.

Инициализируем ресурс и контроллер:

$ kubebuilder create api --group tg --version v1 --kind TrackedField
INFO Create Resource [y/n]                        
y
INFO Create Controller [y/n]                      
y
INFO Writing kustomize manifests for you to edit... 
INFO Writing scaffold for you to edit...          
INFO api/v1/trackedfield_types.go                 
INFO api/v1/groupversion_info.go                  
INFO internal/controller/suite_test.go            
INFO internal/controller/trackedfield_controller.go 
INFO internal/controller/trackedfield_controller_test.go
<...>

Опишем структуру для ресурса. Сначала добавим два новых типа: ActionType и TargetKind. Первый будет содержать действие над ресурсов (создан, модицифирован или удален), а второй название ресурса (для примера помимо Deployment добавим еще Service и StatefulSet). Еще добавим структуру ResourceTarget, по полям которой мы будем определять какой ресурс необходимо отслеживать.

type ActionType string
type TargetKind string

const (
	ResourceCreated ActionType = "Created"
	ResourceUpdated ActionType = "Updated"
	ResourceDeleted ActionType = "Deleted"

	DeploymentKind  TargetKind = "Deployment"
	StatefulSetKind TargetKind = "StatefulSet"
	ServiceKind     TargetKind = "Service"
)

type ResourceTarget struct {
	Kind      TargetKind `json:"kind"`
	Namespace string     `json:"namespace"`
	Name      string     `json:"name"`
}

В spec опишем добавим поля для ресурса, которое нужно отслеживать, название ContactPoint, по которому нужно отправлять алерты и поле, при изменении которого необходимо уведомлять:

type TrackedFieldSpec struct {
	Target       ResourceTarget `json:"target"`
	ContactPoint string         `json:"contactPoint"`
	Field        string         `json:"field"`
}

Поле статуса у ресурса будет списком. В него контроллер будет добавлять каждое действие над полем отслеживаемого ресурса:

type TrackedFieldStatus struct {
	Time   *metav1.Time `json:"time"`
	Value  string       `json:"value,omitempty"`
	Action ActionType   `json:"action"`
}

TrackedField и TrackedFieldList:

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status

// TrackedField is the Schema for the trackedfields API.

type TrackedField struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   TrackedFieldSpec     `json:"spec"`
	Status []TrackedFieldStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// TrackedFieldList contains a list of TrackedField.
type TrackedFieldList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []TrackedField `json:"items"`
}

Сгенерируем CRD и добавим его в кластер:

$ make manifests
$ make install
customresourcedefinition.apiextensions.k8s.io/contactpoints.tg.azamaton.ru unchanged
customresourcedefinition.apiextensions.k8s.io/trackedfields.tg.azamaton.ru created

Приступим к контроллеру. Он находится в файле internal/controller/trackedfield_controller.go.

Так как мы будем взаимодействовать с некоторыми сторонними ресурсами, то добавим необходимые права доступа к ним из контроллера:

// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=services/status,verbs=get

Сразу же объявим функцию addNewStatus() внутри Reconcile, т.к вызывать ее мы будем несколько раз. С помощью нее мы будем добавлять новые элементы в статус ресурса:

func (r *TrackedFieldReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	log := log.FromContext(ctx)

	addNewStatus := func(trackedField *tgv1.TrackedField, action tgv1.ActionType, value string) error {
		newStatus := tgv1.TrackedFieldStatus{
			Time:   &metav1.Time{Time: time.Now()},
			Action: action,
			Value:  value,
		}
		trackedField.Status = append(trackedField.Status, newStatus)

		if err := r.Update(ctx, trackedField); err != nil {
			log.Error(err, "unable to update TrackedField status")
			return err
		}

		return nil
	}
    <...>
}

Объявим несколько переменных:

// CP для отправки уведомления в чат (tg/webhook/etc)
var contactPointService alert.ContactPoint

// кастомный ресурс, который нужно привести к желаемому состоянию
var trackedField tgv1.TrackedField
// кастомный ресурс ContactPoint
var contactPoint tgv1.ContactPoint
// секрет с API Token для авторизации запросов на отправку уведомлении
var secretWithAPIToken corev1.Secret
// массив со статусом ресурса TrackedField
var trackedFieldStatus []tgv1.TrackedFieldStatus
// ресурс K8s который необходимо отслеживать
var targetResource interface{}
// значение отслеживаемого поля
var valueOfTrackedField string

Получим ресурс TrackedField:

if err := r.Get(ctx, req.NamespacedName, &trackedField); err != nil {
    log.V(1).Info("TrackedField was deleted")
    return ctrl.Result{}, client.IgnoreNotFound(err)
}

Создадим три объекта NamespacedName: для поиска ресурсов ContactPoint, Secret и отслеживамого ресурса (в нашем случае Deployment). Сразу после объявления targetContactPointLocation попробуем получить ContactPoint :

targetContactPointLocation := types.NamespacedName{
    Name:      trackedField.Spec.ContactPoint,
    Namespace: req.Namespace,
}

if err := r.Get(ctx, targetContactPointLocation, &contactPoint); err != nil {
    log.V(1).Info("ContactPoint is not found by spec")
    return ctrl.Result{}, client.IgnoreNotFound(err)
}

targetResourceLocation := types.NamespacedName{
    Name:      trackedField.Spec.Target.Name,
    Namespace: trackedField.Spec.Target.Namespace,
}

targetSecretLocation := types.NamespacedName{
    Name:      contactPoint.Spec.ApiToken.SecretName,
    Namespace: "habr-k8s-resource-tracker-system",
}

Если ContactPoint не готов (!Status.Ready), то вернем ctrl.Result с полем RequeueAfter: time.Minute. Это укажет оператору запустить процесс reconciliation через минуту. Это нужно для того, чтобы если вдруг ContactPoint будет готов отправлять уведомления, то пользователю не пришлось пересоздавать ресурс TrackedField.

if !contactPoint.Status.Ready {
    log.V(1).Info("Contact point is not ready")
    return ctrl.Result{RequeueAfter: time.Minute}, nil
}

Получим секрет c токеном как мы это делали ранее в контроллере ContactPoint:

if err := r.Get(ctx, targetSecretLocation, &secretWithAPIToken); err != nil {
    log.Error(err, "unable to get secret with api token by spec")
    return ctrl.Result{}, client.IgnoreNotFound(err)
}

Переменная targetResource не просто так имеет тип interface{}. Это означает, что она может содержать значение любого типа. На основе значения поля TrackedField.Spec.Target.Kind мы будем записывать в данную переменную тип ресурса:

switch trackedField.Spec.Target.Kind {
case tgv1.DeploymentKind:
    targetResource = &v1.Deployment{}
case tgv1.ServiceKind:
    targetResource = &corev1.Service{}
}

Получим нужный ContactPoint при помощи ранее объявленной функции alert.GetContactPointByType:

contactPointService = alert.GetContactPointByType(contactPoint,
    string(secretWithAPIToken.Data[contactPoint.Spec.ApiToken.Key]))

Попробуем преобразовать targetResource в выбранный на основе switch/case тип:

obj, isOk := targetResource.(client.Object)

if !isOk {
    return ctrl.Result{}, errors.New("cannot cast target kind to resource")
}

Обработаем сценарии, когда отслеживаемый ресурс не найден. Если в последнем элементе статуса мы находим значение Action == Deleted, то ничего не изменяем, иначе добавляем новый статус, указывая, что теперь ресурс Deleted и отправляем уведомление в чат об этом. В любом случае указываем RequeueAfter в минуту:

trackedFieldStatus = trackedField.Status

if err := r.Get(ctx, targetResourceLocation, obj); err != nil {
    log.Error(err, "unable to get resource by TrackedField spec")
    lastStatus := trackedFieldStatus[len(trackedFieldStatus)-1]

    if lastStatus.Action == tgv1.ResourceDeleted {
        return ctrl.Result{RequeueAfter: time.Minute}, nil
    }

    addNewStatus(&trackedField, tgv1.ResourceDeleted, "")
    _, err = contactPointService.SendAlert("Resource kind: " + string(trackedField.Spec.Target.Kind) + "\n" +
        "Name: " + trackedField.Spec.Target.Name + "\n" +
        "Status: " + string(tgv1.ResourceDeleted) + "\n" +
        "Value: " + "-1")
    return ctrl.Result{RequeueAfter: time.Minute}, nil
}

Теперь добавим switch/case, но для получения значения отслеживаемого поля. Если Field равен replicas или image, то будем получать значения данных полей из Deployment, targetPort - из Service :

switch trackedField.Spec.Field {
case "replicas":
    valueOfTrackedField = strconv.Itoa(int(*obj.(*v1.Deployment).Spec.Replicas))
case "image":
    valueOfTrackedField = obj.(*v1.Deployment).Spec.Template.Spec.Containers[0].Image
case "targetPort":
    valueOfTrackedField = obj.(*corev1.Service).Spec.Ports[0].TargetPort.StrVal
}

Теперь сделаем проверку на то, является ли поле новым (нет предыдущего статуса) или отличается ли его текущее значение от последнего записанного. Если одно из этих условий верно, контроллер приступает к обработке изменения.

По умолчанию действие над полем устанавливается в ResourceCreated. Но если существует предыдущий статус и его action не равен ResourceDeleted, то это означает, что поле было обновлено. Поэтому мы перезаписываем action = ResourceUpdated. В любом случае добавляется новый статус в массив Status. Если обновление ресурса произошло без ошибок, то отправляем уведомление в чат с записанным action и значением отслеживаемого поля:

if len(trackedFieldStatus) < 1 ||
    (trackedFieldStatus[len(trackedFieldStatus)-1].Value != valueOfTrackedField) {

    action := tgv1.ResourceCreated
    if len(trackedFieldStatus) > 0 {
        lastStatus := trackedFieldStatus[len(trackedFieldStatus)-1]
        if lastStatus.Action != tgv1.ResourceDeleted {
            action = tgv1.ResourceUpdated
        }
    }

    err := addNewStatus(&trackedField, action, valueOfTrackedField)
    if err != nil {
        return ctrl.Result{}, err
    }

    _, err = contactPointService.SendAlert("Resource kind: " + string(trackedField.Spec.Target.Kind) + "\n" +
        "Name: " + trackedField.Spec.Target.Name + "\n" +
        "Status: " + string(action) + "\n" +
        "Value: " + valueOfTrackedField)
    if err != nil {
        return ctrl.Result{}, err
    }
}

return ctrl.Result{RequeueAfter: time.Minute}, nil

На этом моменте контроллер для TrackedField готов. Обновим оператор в кластере и гоу тестировать!

$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.2
$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.2

Запуск оператора

Оставим ContactPoint нетронутым. В файл config/samples/tg_v1_trackedfield.yaml добавим спецификацию TrackedField:

apiVersion: tg.azamaton.ru/v1
kind: TrackedField
metadata:
  labels:
    app.kubernetes.io/name: habr-k8s-resource-tracker
    app.kubernetes.io/managed-by: kustomize
  name: nginx-deployment-replicas
spec:
  contactPoint: telegram-chat
  field: replicas
  target:
    kind: Deployment
    name: nginx
    namespace: default

В ту же директорию добавим файл deploy.yaml с ресурсом Deployment с одной репликой и образом nginx:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx
  namespace: default
  labels:
    app: nginx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
        - name: nginx
          image: nginx:latest
          ports:
            - containerPort: 80

Сначала создадим секрет с API Token и добавим Deployment и ContactPoint:

$ kubectl -n habr-k8s-resource-tracker-system create secret generic telegram-cp-token \ 
        --from-literal=token="12345678:ABCDEFUABCDEFUABCDEFUABCDEFU"
secret/telegram-cp-token created

$ kubectl apply -f ./config/samples/deploy.yaml -f ./config/samples/tg_v1_contactpoint.yaml
deployment.apps/nginx created
contactpoint.tg.azamaton.ru/telegram-cp created

В чат моментально пришло уведомление о готовности cp. Теперь добавим TrackedField:

$ kubectl apply -f ./config/samples/tg_v1_trackedfield.yaml                                
trackedfield.tg.azamaton.ru/nginx-deployment-replicas created

В чат пришло уведомление о том, что был создан соответствующий ресурс со значением replicas: 1 :

Поле со статусом так же было обновлено:

$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
  {
    "action": "Created",
    "time": "2025-02-11T20:44:21Z",
    "value": "1"
  }
]

Увеличим кол-во реплик до 3:

$ kubectl scale deployment nginx --replicas 3
deployment.apps/nginx scaled

На этот раз алерт может прийти не мгновенно, потому что мы сами указали RequeueAfter: time.Minute . Поэтому подождем около минуты и получим результат:

$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
  {
    "action": "Created",
    "time": "2025-02-11T20:44:21Z",
    "value": "1"
  },
  {
    "action": "Updated",
    "time": "2025-02-11T20:51:04Z",
    "value": "3"
  }
]

Удалим Deployment. Немного терпения и снова уведомление и обновление статуса!

$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
  {
    "action": "Created",
    "time": "2025-02-11T20:44:21Z",
    "value": "1"
  },
  {
    "action": "Updated",
    "time": "2025-02-11T20:51:04Z",
    "value": "3"
  },
  {
    "action": "Deleted",
    "time": "2025-02-11T20:54:04Z"
  }
]

Конец

В данной статье я постарался показать как можно создать свой собственный простенький оператор для Kubernetes. Конечно, kubebuilder достаточно мощный инструмент и, возможно, избыточен для решения задачи, которую мы поставили и решили написанием данного оператора. Но я доволен тем, что смог «пощупать» возможности kubebuilder‑а сам, на своем собственном операторе, что может быть полезно в будущем. Всем удачи! =)

Комментарии (0)