Вступление
Всем привет! Я стал замечать на себе, что чаще решаю задачи, связанные с эксплуатацией инфраструктуры (Ops), чем с разработкой. Это видно по моим статьям — они в основном посвящены настройке и развертыванию различных инструментов. Однако «Dev»-составляющая тоже важна, и поэтому я решил прокачиваться еще и в разработке!
В этой статье я хочу поделиться опытом создания Kubernetes-оператора на Go. Этот язык часто используют для подобных задач, а сама идея родилась из практической потребности — автоматизировать отслеживание изменений в кластере и отправку уведомлений в Telegram. Сначала мы разберемся со структурой оператора, затем опишем кастомные ресурсы (Custom Resources), реализуем для них контроллеры и наконец развернем готовый оператор в тестовом кластере. Приступим!
Оглавление
Оглавление
Критика - это хорошо!

Автор не является Go-разработчиком, поэтому если вы заметили недочеты в коде или терминологии, то комментарии (или ЛС) всегда открыты. Спасибо за обратную связь!
Что такое операторы?
Kubernetes из коробки предлагает достаточный объем инструментов для развертывания сервисов. С их помощью можно покрыть почти что все потребности.
Но иногда встроенных возможностей куба не хватает. Тогда на помощь приходят операторы. Например, возьмем CloudNativePG (CNPG), один из операторов для развертывания PostgreSQL в Kubernetes. Установить его можно через kubectl или helm. Так оператор берет на себя конфигурирование и поддержку СУБД, включая хранение настройку, бэкапирование, репликацию и fail-over. Вам необходимо лишь описать в YAML формате ресурс c необходимыми параметрами.

При установке оператора в ваш кластер так же добавляются Custom Resource Definition. Список кастомных ресурсов, которые предоставляет CNPG, можно найти тут.
Рассмотрим в качестве примере Cluster. Именно тут описываются основные параметры инстанса СУБД - кол-во реплик, данные пользователей, настройки для конкретной базы данных и прочее. Вот так это выглядит:
apiVersion: postgresql.cnpg.io/v1
kind: Cluster
metadata:
name: postgresql
namespace: default
spec:
instances: 3
imageName: ghcr.io/cloudnative-pg/postgresql:16.2-14
storage:
pvcTemplate:
resources:
requests:
storage: 5Gi
resources:
requests:
memory: "1Gi"
cpu: 1
limits:
memory: "1Gi"
cpu: 1
После применения манифеста оператор развернет три пода (один мастер, два слейва), сервисы для подключения, секреты с паролями, а так же сообщит, что кластер находится в запущенном состоянии:
$ kubectl get Cluster
NAME AGE INSTANCES READY STATUS PRIMARY
postgresql 119s 3 3 Cluster in healthy state postgresql-1
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
postgresql-1 1/1 Running 0 71s
postgresql-2 1/1 Running 0 50s
postgresql-3 1/1 Running 0 38s
PostgreSQL готов к использованию!
Идея собственного оператора

В этой статье я напишу оператор, который будет отслеживать определенные ресурсы Kubernetes, а затем отправлять о них информацию в Telegram-чат. Для этого я разверну базовый шаблон оператора на Golang через kubebuilder, опишу какие кастомные ресурсы для своего оператора, а затем разверну в кластере k8s. Погнали!
Ссылка на репо с исходным кодом: тут.
Инициализация оператора
Для создания оператора я воспользуюсь kubebuilder. После установки утилиты инициализируем оператор:
$ mkdir k8s-resource-tracker-operator
$ cd ./k8s-resource-tracker-operator
$ kubebuilder init --domain azamaton.ru --repo github.com/AzamatKomaev/k8s-resource-tracker-operator
В текущей директорий должна была появиться следующая структура:
cmd/main.go — точка запуска оператора. Тут описан пакет main и функция main.
api/ — директория, которая содержит Custom Resource Definitions (CRDs) и соответствующие им типы на Go.
config/default — директория с kustomization.yaml, который используется для установки оператора в кластер.
config/manager — директория с ресурсом Deployment для запуска оператора.
config/network‑policy — директория с ресурсами NetworkPolicy. По‑умолчанию содержит разрешение на получение метрик оператора из того же namespace.
config/prometheus — директория с ресурсами ServiceMonitor. Если у вас установлен Prometheus, то метрики из оператора автоматически будут скрапиться прометеусом.
config/rbac — директория с ресурсами для Kubernetes RBAC (SA, Role, RoleBinding, etc), необходимые оператору.
bin/ — это директория содержит исполняемые файлы, например kustomize.
test/ — директория с e2e тестами.
internal/ — директория, содержащая имплементацию контроллеров для определенных кастомных ресурсов (CR).
А теперь приступим к описанию Custom Resource Definition!
Описание структуры для ContactPoint
CRD (Custom Resource Definition) служит для создания своих собственных ресурсов. У моего оператора будут два ресурса: ContactPoint
и TrackedField
. С помощью ContactPoint
будет возможность задавать место, куда будут приходить уведомления об изменений поля. TrackedField
же, в свою очередь, ресурс и поле, за которым необходимо следить и уведомлять в чат. Начнем с ContactPoint
.
Для начала с помощью команды kubebuilder create api
создадим данный ресурс:
$ kubebuilder create api --group tg --version v1 --kind Conta
INFO Create Resource [y/n]
y
INFO Create Controller [y/n]
y
INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO api/v1/contactpoint_types.go
INFO api/v1/groupversion_info.go
INFO internal/controller/suite_test.go
INFO internal/controller/contactpoint_controller.go
INFO internal/controller/contactpoint_controller_test.go
<...>
Создаем и ресурс, и контроллер. kubebuilder добавил базовую заготовку для нашего ресурса в директорию api/v1/
, а так же имплементацию контроллера для этого ресурса в internal/controller. Взгялнем на содержимое contactpoint_types.go
:
api/v1/contactpoint_types.go
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized.
// ContactPointSpec defines the desired state of ContactPoint.
type ContactPointSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
// Foo is an example field of ContactPoint. Edit contactpoint_types.go to remove/update
Foo string `json:"foo,omitempty"`
}
// ContactPointStatus defines the observed state of ContactPoint.
type ContactPointStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file
}
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// ContactPoint is the Schema for the contactpoints API.
type ContactPoint struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ContactPointSpec `json:"spec,omitempty"`
Status ContactPointStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// ContactPointList contains a list of ContactPoint.
type ContactPointList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ContactPoint `json:"items"`
}
func init() {
SchemeBuilder.Register(&ContactPoint{}, &ContactPointList{})
}
У каждого ресурса в K8s должны быть поля apiVersion, kind, metadata, spec и status. Начнем описывать свой собственный ресурс. Для начала укажем типы ContactPoint-ов, куда у нас будет возможность отправлять уведомления. Для начала хватит Telegram и Webhook
type ContactPointType string
const (
TelegramType ContactPointType = "Telegram"
WebhookType ContactPointType = "Webhook"
)
Для каждого из типов CP (ContactPoint) будет своя спецификация. Для Telegram мы должны будем указать Chat ID, тогда как для Webhook ссылку и название заголовка, который следует передавать при запросе.
type ContactPointTelegramSpec struct {
ChatId int `json:"chatId"`
}
type ContactPointWebhookSpec struct {
Url string `json:"url"`
HeaderName string `json:"headerName"`
}
Для отправки сообщения нам нужно передавать еще и api token. Хранить его в коде плохая практика, поэтому брать токен будем из ресурса типа Secret:
type ContactPointApiToken struct {
SecretName string `json:"secretName"`
Key string `json:"key"`
}
Теперь опишем полную спецификаю ресурса:
type ContactPointSpec struct {
Type ContactPointType `json:"type"`
ApiToken ContactPointApiToken `json:"apiToken"`
WebhookSpec ContactPointWebhookSpec `json:"webhookSpec,omitempty"`
TelegramSpec ContactPointTelegramSpec `json:"telegramSpec,omitempty"`
}
Для полей WebhookSpec и TelegramSpec в параметрах передается значение omitempty, которое указывает, что поле опционально.
Теперь опишем поле для статуса. У нас будет два булевых значения. Initialized будет устанавливаться на true при самом первом запуске функции Reconcile (об этом дальше). Ready будет указывать на то, что ContactPoint готов отправлять уведомления в заданный чат. По-умолчанию оба значения равны false
type ContactPointStatus struct {
Initialized bool `json:"initialized"`
Ready bool `json:"ready"`
}
И наконец весь ресурс со всеми полями. Обратите внимание на комментарии, которые начинаются на +kubebuilder:
. Они служат для того, чтобы kubebuilder понимал, что type ContactPoint является корневым ресурсом:
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// ContactPoint is the Schema for the contactpoints API.
type ContactPoint struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec ContactPointSpec `json:"spec,omitempty"`
Status ContactPointStatus `json:"status,omitempty"`
}
Последняя структура необходима для представления списка ресурсов, получаемых при операции list:
// +kubebuilder:object:root=true
// ContactPointList contains a list of ContactPoint.
type ContactPointList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []ContactPoint `json:"items"`
}
Структуры готовы, теперь осталось сгенерировать CRD:
$ make manifests
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
В папке config/crd/bases/
появился файл tg.azamaton.ru_contactpoints.yaml
, содержащий ресурс CRD. Он был сгенерирован на основе описанных выше Go-структур:
config/crd/bases/tg.azamaton.ru_contactpoints.yaml
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.16.4
name: contactpoints.tg.azamaton.ru
spec:
group: tg.azamaton.ru
names:
kind: ContactPoint
listKind: ContactPointList
plural: contactpoints
singular: contactpoint
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
description: ContactPoint is the Schema for the contactpoints API.
properties:
apiVersion:
description: |-
APIVersion defines the versioned schema of this representation of an object.
Servers should convert recognized schemas to the latest internal value, and
may reject unrecognized values.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources
type: string
kind:
description: |-
Kind is a string value representing the REST resource this object represents.
Servers may infer this from the endpoint the client submits requests to.
Cannot be updated.
In CamelCase.
More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds
type: string
metadata:
type: object
spec:
properties:
apiToken:
properties:
key:
type: string
secretName:
type: string
required:
- key
- secretName
type: object
telegramSpec:
properties:
chatId:
type: integer
required:
- chatId
type: object
type:
type: string
webhookSpec:
properties:
headerName:
type: string
url:
type: string
required:
- headerName
- url
type: object
required:
- apiToken
- type
type: object
status:
properties:
initialized:
type: boolean
ready:
type: boolean
required:
- initialized
- ready
type: object
type: object
served: true
storage: true
subresources:
status: {}
Теперь добавим CRD в наш кластер. Если выполнять команду make install
впервые, то kubebuilder установит в папку bin/
утилиту kustomize
:
$ make install
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/controller-gen rbac:roleName=manager-role crd webhook paths="./..." output:crd:artifacts:config=config/crd/bases
Downloading sigs.k8s.io/kustomize/kustomize/v5@v5.5.0
/home/azamat/programming/go/habr-k8s-resource-tracker/bin/kustomize build config/crd | kubectl apply -f -
customresourcedefinition.apiextensions.k8s.io/contactpoints.tg.azamaton.ru created
Проверим, что CRD добавился:
$ kubectl get CustomResourceDefinition/contactpoints.tg.azamaton.ru
NAME CREATED AT
contactpoints.tg.azamaton.ru 2025-01-31T14:11:35Z
Реализация контроллера для ContactPoint
Теперь займемся логикой. Сейчас код контроллера для созданного ресурса выглядит следующим образом:
internal/controller/contactpoint_controller.go
/*
Copyright 2025.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
tgv1 "github.com/AzamatKomaev/k8s-resource-tracker-operator/api/v1"
)
// ContactPointReconciler reconciles a ContactPoint object
type ContactPointReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the ContactPoint object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.1/pkg/reconcile
func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
_ = log.FromContext(ctx)
// TODO(user): your logic here
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *ContactPointReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&tgv1.ContactPoint{}).
Named("contactpoint").
Complete(r)
}
Больше всего тут нас интересует функция Reconcile. Именно в ней мы будем синхронизировать текущее состояние кастомного ресурса с желаемым.
Начнем с комментариев над фукнцией. С помощью комментария +kubebuilder:rbac
мы можем указать какие RBAC роли нужны будут нашему оператору. По-умолчанию указаны полные права на взаимодействие с ресурсом ContactPoint. Дополнительно добавим роль для получения секретов, это необходимо для получения API-токена:
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
Функция Reconcile принимает два аргумента: ctx context.Context
и req ctrl.Request
. Context необходим для различных API действий, например для отмены запроса или его переноса. Request же содежит информацию о текущем ресурсе.
Начнем с простого: получение кастомного ресурса. Получим log чтобы выводить необходимую информацию в консоль и создадим переменную для будущего кастомного ресурса:
func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var contactPoint tgv1.ContactPoint
// <...>
}
У объекта Reconciler
есть метод Get
, в который нужно передать текущий контекст, объект NamespacedName
и ссылку на ресурс. Про контекст я уже рассказывал чуть выше. NamespacedName
же содержит два поля: Name и Namespace. Данных полей достаточно, чтобы найти ресурс в нашем кластере. Для кастомного ресурса мы будем брать его из объекта ctrl.Request
:
// <...>
if err := r.Get(ctx, req.NamespacedName, &contactPoint); err != nil {
log.V(1).Info("ContactPoint was deleted")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Сразу же проверяем err в блоке if. Если ресурс не был найден, то это означает, что его удалили. Поэтому в логах выводим соотвествующую информацию и возвращаем пустой объект Result
и ошибку, переданную в client.IgnoreNotFound
. Если ошибка NotFound, то IgnoreNotFound
как раз вернет nil, иначе саму ошибку.
Если же ресурс был найден, то выведем в логи namespace ресурса и его тип из spec:
// <...>
log.V(1).Info("Namespace: " + contactPoint.Namespace)
log.V(1).Info("Type: " + string(contactPoint.Spec.Type))
return ctrl.Result{}, nil
}
Сейчас функция выглядит так:
internal/controller/contactpoint_controller.go
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=contactpoints/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
var contactPoint tgv1.ContactPoint
if err := r.Get(ctx, req.NamespacedName, &contactPoint); err != nil {
log.V(1).Info("ContactPoint was deleted")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
log.V(1).Info("Namespace: " + contactPoint.Namespace)
log.V(1).Info("Type: " + string(contactPoint.Spec.Type))
return ctrl.Result{}, nil
}
Время для запуска нашего оператора!
Первый запуск оператора
Для начала давайте опишем сам кастомный ресурс в файле config/samples/tg_v1_contactpoint.yaml
:
apiVersion: tg.azamaton.ru/v1
kind: ContactPoint
metadata:
labels:
app.kubernetes.io/name: habr-k8s-resource-tracker
app.kubernetes.io/managed-by: kustomize
name: telegram-cp
namespace: default
spec:
type: Telegram
telegramSpec:
chatId: 12345678
apiToken:
secretName: telegram-cp-token
key: token
Теперь соберем Docker образ и запушим его в DockerHub:
$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.0
Начнется процесс сборки образа, а затем его загрузки в реестр. Дождемся завершения и наконец задеплоим оператор в Kubernetes кластер:
$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.0
Проверим, что Deployment перешел в статус Running:
$ kubectl -n habr-k8s-resource-tracker-system get pods
NAME READY STATUS RESTARTS AGE
habr-k8s-resource-tracker-controller-manager-767b8695d-wcds9 1/1 Running 0 3m49s
Теперь задеплоим выше описанный ContactPoint:
$ kubectl apply -f ./config/samples/tg_v1_contactpoint.yaml
contactpoint.tg.azamaton.ru/telegram-cp created
Проверим логи оператора:
<...>
2025-02-05T12:38:52Z DEBUG Namespace: default {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "6a4d4a49-298c-4366-8fea-df63753e7a21"}
2025-02-05T12:38:52Z DEBUG Type: Telegram {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "6a4d4a49-298c-4366-8fea-df63753e7a21"}
Теперь удалим ресурс и проверим логи снова:
2025-02-05T12:44:44Z DEBUG ContactPoint was deleted {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "4773e7c6-9994-490c-a940-828b5bbda1f5"}
Реализация отправки уведомления в чат
Создадим директорию internal/alert
. Тут же опишем файл types.go
:
package alert
type ContactPoint interface {
SendAlert(string) (interface{}, error)
}
type TelegramSendMessageBody struct {
Text string `json:"text"`
ChatID int `json:"chat_id"`
}
type TelegramSendMessageResponse struct {
Ok bool `json:"ok"`
Result SendMessageResult `json:"result"`
}
type SendMessageResult struct {
MessageID int `json:"message_id"`
From interface{} `json:"from"`
Chat interface{} `json:"chat"`
Date int64 `json:"date"`
Text string `json:"text"`
}
Тут опишем интерфейс ContactPoint с функцией SendAlert(string)
, а также структуры, необходимые при работе с уведомлениями.
Реализуем TelegramContactPoint
в файле internal/alert/telegram.go
. Опишем константу с URL Telegram API и структуру TelegramContactPoint
. Данная структура будет реализовывать интерфейс ContactPoint
:
const TELEGRAM_BASE_API_URL = "https://api.telegram.org/bot%s"
type TelegramContactPoint struct {
ChatID int
APIToken string
}
Реализуем метод SendAlert
. Он принимает на вход строку message
, остальные параметры (айди чата и токен бота) будут передаваться через поля структуры. Возвращает же функция ответ запроса и ошибку (при ее возникновений).
func (cp *TelegramContactPoint) SendAlert(message string) (interface{}, error) {
if cp == nil {
return nil, errors.New("TelegramContactPoint receiver is nil")
}
telegramAPIUrl := fmt.Sprintf(TELEGRAM_BASE_API_URL+"/sendMessage", cp.APIToken)
body, err := json.Marshal(TelegramSendMessageBody{Text: message, ChatID: cp.ChatID})
if err != nil {
return nil, errors.New("failed to marshal request body")
}
req, err := http.NewRequest("POST", telegramAPIUrl, bytes.NewBuffer(body))
if err != nil {
return nil, errors.New("failed to create HTTP request")
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil || resp.StatusCode != 200 {
fmt.Println("Status code is " + strconv.Itoa(resp.StatusCode))
return nil, errors.New("HTTP request failed")
}
defer resp.Body.Close()
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, errors.New("failed to read response body")
}
sendMessageResponse := TelegramSendMessageResponse{}
err = json.Unmarshal(responseBody, &sendMessageResponse)
if err != nil {
return nil, errors.New("failed to unmarshal response")
}
return sendMessageResponse, nil
}
Дополнительно создадим структуру еще и для отправки уведомлении через вебхук в файле internal/alert/webhook.go
. Реализовывать ее не будем, добавим ее лишь для того, чтобы продемонстрировать возможность добавлять новые типы для Contact point-ов:
type WebhookContactPoint struct {
URL string
APIToken string
HeaderName string
}
func (cp *WebhookContactPoint) SendAlert(message string) (interface{}, error) {
return nil, errors.New("webhook alerting is not available yet")
}
Так как ContactPointType
у нас выступает отдельным типом (хоть и строкой), то добавим функцию для получения ContactPoint
по данному значению. Это нужно для удобства и принципа DRY, так как использовать этот функционал мы будем в нескольких местах.
Заведем файл internal/alert/contactpoint.go
со следующим содержимым:
func GetContactPointByType(cp v1.ContactPoint, apiToken string) ContactPoint {
var contactPointService ContactPoint
contactPointService = &TelegramContactPoint{
ChatID: cp.Spec.TelegramSpec.ChatId,
APIToken: apiToken,
}
if cp.Spec.Type == tgv1.WebhookType {
contactPointService = &WebhookContactPoint{
URL: cp.Spec.WebhookSpec.Url,
APIToken: apiToken,
HeaderName: cp.Spec.WebhookSpec.HeaderName,
}
}
return contactPointService
}
Логика проста: получаем на вход объект ContactPoint
и на основе cp.Spec.Type
выбираем нужный ContactPoint
. По умолчанию возвращается TelegramContactPoint
.
По логике отправки уведомлении мы закончили. Вернемся к контроллеру.
Допиливание контроллера для ContactPoint
Еще не забыли что мы добавили в функцию Reconcile
? Вспоминайте, потому что сейчас мы будем добавлять новый код =)
Добавим переменную с типом Secret
, в которой как раз будет храниться API Token. Сразу же добавим и targetSecretLocation
с типом NamespacedName
, который используется для поиска ресурса по его названию и пространству окружения. Название мы будем брать из spec-а нашего кастомного ресурса ContactPoint
, а пространство имен укажем то же самое, где у нас деплоится оператор:
func (r *ContactPointReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// <...>
var secretWithAPIToken v1.Secret
targetSecretLocation := types.NamespacedName{
Name: contactPoint.Spec.ApiToken.SecretName,
Namespace: "habr-k8s-resource-tracker-system",
}
return ctrl.Result{}, nil
}
Через уже знакомый метод r.Get
получим секрет. Если он не будет найден, то вернем ошибку с информацией о том, что секрет с указанным названием найти не получилось:
if err := r.Get(ctx, targetSecretLocation, &secretWithAPIToken); err != nil {
log.Error(err, "unable to get secret with api token by spec")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Далее проверим значение contactPoint.Status.Ready
. Так как функция Reconcile
должна быть индопонентна, то мы при каждом ее вызове необходимо проверять статус ресурса. Если значение false, то сначала получим соотвествующий типу ContactPoint
, а затем вызовем метод SendAlert
. Если метод вернет ошибку, то статус останется false. Если же err == nil
, то назначим статус готовности true. Значение для поля Initialized в любом случае установится true:
if !contactPoint.Status.Ready {
var contactPointService alert.ContactPoint
contactPointService = alert.GetContactPointByType(contactPoint,
string(secretWithAPIToken.Data[contactPoint.Spec.ApiToken.Key]))
_, err := contactPointService.SendAlert("Contact point is ready")
if err != nil {
contactPoint.Status = tgv1.ContactPointStatus{Ready: false, Initialized: true}
log.Error(err, "send alert return error: "+err.Error())
} else {
contactPoint.Status = tgv1.ContactPointStatus{Ready: true, Initialized: true}
}
}
Еще обратите внимание на то, каким образом передается APIToken в метод GetContactPointByType
. У экземпляра Secret
есть поле Data
, оно представлено типом данным map. Отсюда по ключу мы можем вытащить любой нужное значение.
После блока if обновим ресурс при помощи метода r.Status().Update
. В качестве аргументов передаем уже знакомый нам контекст и указатель на измененный ресурс:
if err := r.Status().Update(ctx, &contactPoint); err != nil {
log.Error(err, "unable to update ContactPoint status")
return ctrl.Result{}, err
}
Теперь давайте тестить!
Тестирование готового контроллера для ContactPoint
Соберем и задеплоим новую версию оператора в кластер:
$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.1
$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.1
Для начала добавим ContactPoint
с несуществующим секретом и случайным chatId:
apiVersion: tg.azamaton.ru/v1
kind: ContactPoint
metadata:
labels:
app.kubernetes.io/name: habr-k8s-resource-tracker
app.kubernetes.io/managed-by: kustomize
name: telegram-cp
namespace: default
spec:
type: Telegram
telegramSpec:
chatId: 12345678
apiToken:
secretName: telegram-cp-token
key: token
Проверим логи:
ERROR unable to get secret with api token by spec {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "9ab6bcba-f4d7-4e0e-83ff-cc5e1864a6bc", "error": "Secret \"telegram-cp-token\" not found"}
Проверка на наличие секрета сработала. Теперь добавим сам секрет (с актуальным токеном) и пересоздадим ресурс:
$ kubectl -n habr-k8s-resource-tracker-system create secret generic telegram-cp-token \
--from-literal=token="12345678:ABCDEFUABCDEFUABCDEFUABCDEFU"
secret/telegram-cp-token created
Проверим логи снова:
ERROR send alert return error: HTTP request failed {"controller": "contactpoint", "controllerGroup": "tg.azamaton.ru", "controllerKind": "ContactPoint", "ContactPoint": {"name":"telegram-cp","namespace":"default"}, "namespace": "default", "name": "telegram-cp", "reconcileID": "dcca6b74-17ca-48ce-a6af-6020e9b99eb3", "error": "HTTP request failed"}
Теперь проблема с HTTP запросом. А все потому что у нас нет прав отправлять сообщения в чат с ID 12345678. Изменим ID чата на тот, куда бот имеет право отправить алерт. Сразу же после изменения прилетит сообщение в указанный чат, что Contact Point готов. При этом в логах будет пусто:

Проверим Status ресурса:
$ kubectl get ContactPoint/telegram-cp -o json | jq .status
{
"initialized": true,
"ready": true
}
Contact Point готов!
Реализация контроллера для TrackedField
Теперь, когда контроллер для ContactPoint
работает как нужно, мы можем приступить к реализации нового кастомного ресурса: TrackedField.
По аналогии с типом contact point, мы заложим возможность добавлять новые ресурсы и поля для отслеживания. Но в данной статье мы ограничимся лишь полем replicas
у ресурса Deployment
.
Инициализируем ресурс и контроллер:
$ kubebuilder create api --group tg --version v1 --kind TrackedField
INFO Create Resource [y/n]
y
INFO Create Controller [y/n]
y
INFO Writing kustomize manifests for you to edit...
INFO Writing scaffold for you to edit...
INFO api/v1/trackedfield_types.go
INFO api/v1/groupversion_info.go
INFO internal/controller/suite_test.go
INFO internal/controller/trackedfield_controller.go
INFO internal/controller/trackedfield_controller_test.go
<...>
Опишем структуру для ресурса. Сначала добавим два новых типа: ActionType
и TargetKind
. Первый будет содержать действие над ресурсов (создан, модицифирован или удален), а второй название ресурса (для примера помимо Deployment
добавим еще Service
и StatefulSet
). Еще добавим структуру ResourceTarget
, по полям которой мы будем определять какой ресурс необходимо отслеживать.
type ActionType string
type TargetKind string
const (
ResourceCreated ActionType = "Created"
ResourceUpdated ActionType = "Updated"
ResourceDeleted ActionType = "Deleted"
DeploymentKind TargetKind = "Deployment"
StatefulSetKind TargetKind = "StatefulSet"
ServiceKind TargetKind = "Service"
)
type ResourceTarget struct {
Kind TargetKind `json:"kind"`
Namespace string `json:"namespace"`
Name string `json:"name"`
}
В spec опишем добавим поля для ресурса, которое нужно отслеживать, название ContactPoint
, по которому нужно отправлять алерты и поле, при изменении которого необходимо уведомлять:
type TrackedFieldSpec struct {
Target ResourceTarget `json:"target"`
ContactPoint string `json:"contactPoint"`
Field string `json:"field"`
}
Поле статуса у ресурса будет списком. В него контроллер будет добавлять каждое действие над полем отслеживаемого ресурса:
type TrackedFieldStatus struct {
Time *metav1.Time `json:"time"`
Value string `json:"value,omitempty"`
Action ActionType `json:"action"`
}
TrackedField
и TrackedFieldList
:
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// TrackedField is the Schema for the trackedfields API.
type TrackedField struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec TrackedFieldSpec `json:"spec"`
Status []TrackedFieldStatus `json:"status,omitempty"`
}
// +kubebuilder:object:root=true
// TrackedFieldList contains a list of TrackedField.
type TrackedFieldList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []TrackedField `json:"items"`
}
Сгенерируем CRD и добавим его в кластер:
$ make manifests
$ make install
customresourcedefinition.apiextensions.k8s.io/contactpoints.tg.azamaton.ru unchanged
customresourcedefinition.apiextensions.k8s.io/trackedfields.tg.azamaton.ru created
Приступим к контроллеру. Он находится в файле internal/controller/trackedfield_controller.go
.
Так как мы будем взаимодействовать с некоторыми сторонними ресурсами, то добавим необходимые права доступа к ним из контроллера:
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=tg.azamaton.ru,resources=trackedfields/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=services/status,verbs=get
Сразу же объявим функцию addNewStatus()
внутри Reconcile, т.к вызывать ее мы будем несколько раз. С помощью нее мы будем добавлять новые элементы в статус ресурса:
func (r *TrackedFieldReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
addNewStatus := func(trackedField *tgv1.TrackedField, action tgv1.ActionType, value string) error {
newStatus := tgv1.TrackedFieldStatus{
Time: &metav1.Time{Time: time.Now()},
Action: action,
Value: value,
}
trackedField.Status = append(trackedField.Status, newStatus)
if err := r.Update(ctx, trackedField); err != nil {
log.Error(err, "unable to update TrackedField status")
return err
}
return nil
}
<...>
}
Объявим несколько переменных:
// CP для отправки уведомления в чат (tg/webhook/etc)
var contactPointService alert.ContactPoint
// кастомный ресурс, который нужно привести к желаемому состоянию
var trackedField tgv1.TrackedField
// кастомный ресурс ContactPoint
var contactPoint tgv1.ContactPoint
// секрет с API Token для авторизации запросов на отправку уведомлении
var secretWithAPIToken corev1.Secret
// массив со статусом ресурса TrackedField
var trackedFieldStatus []tgv1.TrackedFieldStatus
// ресурс K8s который необходимо отслеживать
var targetResource interface{}
// значение отслеживаемого поля
var valueOfTrackedField string
Получим ресурс TrackedField:
if err := r.Get(ctx, req.NamespacedName, &trackedField); err != nil {
log.V(1).Info("TrackedField was deleted")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Создадим три объекта NamespacedName: для поиска ресурсов ContactPoint, Secret и отслеживамого ресурса (в нашем случае Deployment). Сразу после объявления targetContactPointLocation
попробуем получить ContactPoint
:
targetContactPointLocation := types.NamespacedName{
Name: trackedField.Spec.ContactPoint,
Namespace: req.Namespace,
}
if err := r.Get(ctx, targetContactPointLocation, &contactPoint); err != nil {
log.V(1).Info("ContactPoint is not found by spec")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
targetResourceLocation := types.NamespacedName{
Name: trackedField.Spec.Target.Name,
Namespace: trackedField.Spec.Target.Namespace,
}
targetSecretLocation := types.NamespacedName{
Name: contactPoint.Spec.ApiToken.SecretName,
Namespace: "habr-k8s-resource-tracker-system",
}
Если ContactPoint
не готов (!Status.Ready
), то вернем ctrl.Result
с полем RequeueAfter: time.Minute
. Это укажет оператору запустить процесс reconciliation через минуту. Это нужно для того, чтобы если вдруг ContactPoint
будет готов отправлять уведомления, то пользователю не пришлось пересоздавать ресурс TrackedField
.
if !contactPoint.Status.Ready {
log.V(1).Info("Contact point is not ready")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
Получим секрет c токеном как мы это делали ранее в контроллере ContactPoint
:
if err := r.Get(ctx, targetSecretLocation, &secretWithAPIToken); err != nil {
log.Error(err, "unable to get secret with api token by spec")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
Переменная targetResource
не просто так имеет тип interface{}
. Это означает, что она может содержать значение любого типа. На основе значения поля TrackedField.Spec.Target.Kind
мы будем записывать в данную переменную тип ресурса:
switch trackedField.Spec.Target.Kind {
case tgv1.DeploymentKind:
targetResource = &v1.Deployment{}
case tgv1.ServiceKind:
targetResource = &corev1.Service{}
}
Получим нужный ContactPoint
при помощи ранее объявленной функции alert.GetContactPointByType
:
contactPointService = alert.GetContactPointByType(contactPoint,
string(secretWithAPIToken.Data[contactPoint.Spec.ApiToken.Key]))
Попробуем преобразовать targetResource
в выбранный на основе switch/case тип:
obj, isOk := targetResource.(client.Object)
if !isOk {
return ctrl.Result{}, errors.New("cannot cast target kind to resource")
}
Обработаем сценарии, когда отслеживаемый ресурс не найден. Если в последнем элементе статуса мы находим значение Action == Deleted
, то ничего не изменяем, иначе добавляем новый статус, указывая, что теперь ресурс Deleted
и отправляем уведомление в чат об этом. В любом случае указываем RequeueAfter
в минуту:
trackedFieldStatus = trackedField.Status
if err := r.Get(ctx, targetResourceLocation, obj); err != nil {
log.Error(err, "unable to get resource by TrackedField spec")
lastStatus := trackedFieldStatus[len(trackedFieldStatus)-1]
if lastStatus.Action == tgv1.ResourceDeleted {
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
addNewStatus(&trackedField, tgv1.ResourceDeleted, "")
_, err = contactPointService.SendAlert("Resource kind: " + string(trackedField.Spec.Target.Kind) + "\n" +
"Name: " + trackedField.Spec.Target.Name + "\n" +
"Status: " + string(tgv1.ResourceDeleted) + "\n" +
"Value: " + "-1")
return ctrl.Result{RequeueAfter: time.Minute}, nil
}
Теперь добавим switch/case, но для получения значения отслеживаемого поля. Если Field
равен replicas
или image
, то будем получать значения данных полей из Deployment
, targetPort
- из Service
:
switch trackedField.Spec.Field {
case "replicas":
valueOfTrackedField = strconv.Itoa(int(*obj.(*v1.Deployment).Spec.Replicas))
case "image":
valueOfTrackedField = obj.(*v1.Deployment).Spec.Template.Spec.Containers[0].Image
case "targetPort":
valueOfTrackedField = obj.(*corev1.Service).Spec.Ports[0].TargetPort.StrVal
}
Теперь сделаем проверку на то, является ли поле новым (нет предыдущего статуса) или отличается ли его текущее значение от последнего записанного. Если одно из этих условий верно, контроллер приступает к обработке изменения.
По умолчанию действие над полем устанавливается в ResourceCreated
. Но если существует предыдущий статус и его action не равен ResourceDeleted
, то это означает, что поле было обновлено. Поэтому мы перезаписываем action = ResourceUpdated
. В любом случае добавляется новый статус в массив Status. Если обновление ресурса произошло без ошибок, то отправляем уведомление в чат с записанным action и значением отслеживаемого поля:
if len(trackedFieldStatus) < 1 ||
(trackedFieldStatus[len(trackedFieldStatus)-1].Value != valueOfTrackedField) {
action := tgv1.ResourceCreated
if len(trackedFieldStatus) > 0 {
lastStatus := trackedFieldStatus[len(trackedFieldStatus)-1]
if lastStatus.Action != tgv1.ResourceDeleted {
action = tgv1.ResourceUpdated
}
}
err := addNewStatus(&trackedField, action, valueOfTrackedField)
if err != nil {
return ctrl.Result{}, err
}
_, err = contactPointService.SendAlert("Resource kind: " + string(trackedField.Spec.Target.Kind) + "\n" +
"Name: " + trackedField.Spec.Target.Name + "\n" +
"Status: " + string(action) + "\n" +
"Value: " + valueOfTrackedField)
if err != nil {
return ctrl.Result{}, err
}
}
return ctrl.Result{RequeueAfter: time.Minute}, nil
На этом моменте контроллер для TrackedField готов. Обновим оператор в кластере и гоу тестировать!
$ make docker-build docker-push IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.2
$ make deploy IMG=azamatkomaev/habr-k8s-resource-tracker-operator:1.0.2
Запуск оператора
Оставим ContactPoint нетронутым. В файл config/samples/tg_v1_trackedfield.yaml
добавим спецификацию TrackedField:
apiVersion: tg.azamaton.ru/v1
kind: TrackedField
metadata:
labels:
app.kubernetes.io/name: habr-k8s-resource-tracker
app.kubernetes.io/managed-by: kustomize
name: nginx-deployment-replicas
spec:
contactPoint: telegram-chat
field: replicas
target:
kind: Deployment
name: nginx
namespace: default
В ту же директорию добавим файл deploy.yaml с ресурсом Deployment с одной репликой и образом nginx:
apiVersion: apps/v1
kind: Deployment
metadata:
name: nginx
namespace: default
labels:
app: nginx
spec:
replicas: 1
selector:
matchLabels:
app: nginx
template:
metadata:
labels:
app: nginx
spec:
containers:
- name: nginx
image: nginx:latest
ports:
- containerPort: 80
Сначала создадим секрет с API Token и добавим Deployment и ContactPoint:
$ kubectl -n habr-k8s-resource-tracker-system create secret generic telegram-cp-token \
--from-literal=token="12345678:ABCDEFUABCDEFUABCDEFUABCDEFU"
secret/telegram-cp-token created
$ kubectl apply -f ./config/samples/deploy.yaml -f ./config/samples/tg_v1_contactpoint.yaml
deployment.apps/nginx created
contactpoint.tg.azamaton.ru/telegram-cp created
В чат моментально пришло уведомление о готовности cp. Теперь добавим TrackedField:
$ kubectl apply -f ./config/samples/tg_v1_trackedfield.yaml
trackedfield.tg.azamaton.ru/nginx-deployment-replicas created
В чат пришло уведомление о том, что был создан соответствующий ресурс со значением replicas: 1
:

Поле со статусом так же было обновлено:
$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
{
"action": "Created",
"time": "2025-02-11T20:44:21Z",
"value": "1"
}
]
Увеличим кол-во реплик до 3:
$ kubectl scale deployment nginx --replicas 3
deployment.apps/nginx scaled
На этот раз алерт может прийти не мгновенно, потому что мы сами указали RequeueAfter: time.Minute
. Поэтому подождем около минуты и получим результат:

$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
{
"action": "Created",
"time": "2025-02-11T20:44:21Z",
"value": "1"
},
{
"action": "Updated",
"time": "2025-02-11T20:51:04Z",
"value": "3"
}
]
Удалим Deployment. Немного терпения и снова уведомление и обновление статуса!
$ kubectl get TrackedField/nginx-deployment-replicas -o json | jq .status
[
{
"action": "Created",
"time": "2025-02-11T20:44:21Z",
"value": "1"
},
{
"action": "Updated",
"time": "2025-02-11T20:51:04Z",
"value": "3"
},
{
"action": "Deleted",
"time": "2025-02-11T20:54:04Z"
}
]
Конец
В данной статье я постарался показать как можно создать свой собственный простенький оператор для Kubernetes. Конечно, kubebuilder достаточно мощный инструмент и, возможно, избыточен для решения задачи, которую мы поставили и решили написанием данного оператора. Но я доволен тем, что смог «пощупать» возможности kubebuilder‑а сам, на своем собственном операторе, что может быть полезно в будущем. Всем удачи! =)