Привет, Хабр!
Есть классическая боль очередей: скейлинг по факту отставания. Пока backlog вырос, пока HPA дотянулся, пока новые pod«ы прогрелись — SLO уже упал. Решение напрашивается: считать не сколько наваливается прямо сейчас, а сколько нужно серверов, чтобы вероятность ждать больше T была ниже целевого порога. Ровно это умеет Erlang‑C пришедший из жизни колл‑центров. Берём — интенсивность входа,
— среднюю производительность одного воркера, целевой сервис‑левел по ожиданию в очереди, и получаем требуемое число агентов c. Дальше превращаем это в desired replicas и отдаём в KEDA через External Scaler поверх gRPC. Получается предиктивный автоскейлинг, привязанный к SLO, а не к догоняющим метрикам.
Erlang-C как инструмент для SLO по ожиданию
Работаем в стандартной модели M/M/c: пуассоновский вход, экспоненциальные сервис‑таймы, c независимых серверов, бесконечная очередь. Тогда вероятность того, что пришедший попадёт ждать, выражается формулой Erlang‑C. Пусть — предложенная нагрузка в ерлангах. Тогда
Это вероятность того, что клиент будет ждать начала обслуживания. Это — классика модели M/M/c.
Дальше важная связь с SLO: распределение времени ожидания условно на событие «пришлось ждать» — экспоненциальное с параметром . Значит неусловная вероятность уложиться в порог
такова:
Сервис‑левел по ожиданию до — это и есть формула выше
Цель: найти минимальное cc, при котором при заданных TT и целевом уровне
. Условия устойчивости обязательны:
. Оговорка адекватности: Erlang‑C переоценивает ожидание, если клиенты отваливаются из очереди — в таком случае модель Erlang‑A точнее, а C будет консервативной.
Как превратить это в контроллер мощностей
Шаги простые и повторяемые:
Оценить или спрогнозировать λ на горизонте, который соответствует времени прогрева/холод‑старту и желаемому SLO‑окну. Это можно сделать PromQL функциями
predict_linear
или сглаживаниемdouble_exponential_smoothing
. В актуальной документации Prometheusholt_winters
переименован вdouble_exponential_smoothing
, аpredict_linear
— линейная регрессия на окне.Оценить μ из метрик сервиса. Надёжно брать средний сервис‑тайм за окно и инвертировать:
.
По
посчитать минимальное cc бинарным поиском по формуле выше.
Отдать cc как метрику, из которой HPA без лишних трюков получит desired replicas. В KEDA у
ScaledObject
для external‑метрик тип по умолчаниюAverageValue
, и HPA тогда целится в «глобальная метрика / targetAverageValue». Если положить targetAverageValue = 1, то desired replicas будет равен значению метрики.
Реализация KEDA External Scaler на Go
KEDA ходит к внешнему gRPC‑серверу и вызывает четыре метода: IsActive
, StreamIsActive
, GetMetricSpec
, GetMetrics
. Сигнатуры описаны в externalscaler.proto
, а в документации KEDA показано, как именно KEDA их использует. Мы реализуем «pull»‑вариант без стрима.
Ниже заготовка. Она:
читает из
ScaledObject.metadata
PromQL‑запросы для λ и среднего сервис‑тайма;прогнозирует λ на заданный горизонт;
считает cc по Erlang‑C с бинарным поиском;
фиксирует анти‑дребезг: scale down медленнее scale up, гистерезис по допуску ε;
публикует прометеевые метрики самого скейлера;
поддерживает TLS для gRPC по флагам.
// cmd/scaler/main.go
package main
import (
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"math"
"net"
"net/http"
"os"
"strconv"
"time"
pb "github.com/kedacore/keda/pkg/scalers/externalscaler"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type server struct {
pb.UnimplementedExternalScalerServer
httpc *http.Client
logger *log.Logger
now func() time.Time
// simple state for hysteresis
lastC int
lastChange time.Time
scaleDownHold time.Duration
scaleUpMinStep int
epsilonSL float64
minC, maxC int
}
func main() {
addr := flag.String("listen", ":9090", "gRPC listen addr")
scaleDownHold := flag.Duration("scale-down-hold", 2*time.Minute, "hold time before scaling down")
scaleUpMinStep := flag.Int("scale-up-min-step", 1, "minimal scale up step")
useTLS := flag.Bool("tls", false, "enable TLS")
certFile := flag.String("tls-cert", "", "TLS cert")
keyFile := flag.String("tls-key", "", "TLS key")
flag.Parse()
s := &server{
httpc: &http.Client{
Timeout: 5 * time.Second,
},
logger: log.New(os.Stdout, "erlangc-scaler ", log.LstdFlags|log.Lmsgprefix),
now: time.Now,
scaleDownHold: *scaleDownHold,
scaleUpMinStep: *scaleUpMinStep,
epsilonSL: 0.01,
minC: 1,
maxC: 10000,
}
var srv *grpc.Server
if *useTLS {
creds, err := credentials.NewServerTLSFromFile(*certFile, *keyFile)
if err != nil {
log.Fatalf("tls: %v", err)
}
srv = grpc.NewServer(grpc.Creds(creds))
} else {
srv = grpc.NewServer()
}
pb.RegisterExternalScalerServer(srv, s)
lis, err := net.Listen("tcp", *addr)
if err != nil {
log.Fatal(err)
}
log.Printf("listening on %s", *addr)
if err := srv.Serve(lis); err != nil {
log.Fatal(err)
}
}
func (s *server) IsActive(ctx context.Context, in *pb.ScaledObjectRef) (*pb.IsActiveResponse, error) {
// активируемся, если предсказанный λ > 0 и требуемое c >= minC
res, _, err := s.compute(ctx, in)
active := false
if err == nil && res >= int32(s.minC) {
active = true
}
return &pb.IsActiveResponse{Result: active}, nil
}
func (s *server) GetMetricSpec(ctx context.Context, in *pb.ScaledObjectRef) (*pb.GetMetricSpecResponse, error) {
// HPA должен интерпретировать метрику как AverageValue = 1,
// значит desiredReplicas = metricValue
return &pb.GetMetricSpecResponse{
MetricSpecs: []*pb.MetricSpec{
{
MetricName: "erlangc_required_replicas",
TargetSize: 1, // AverageValue=1
},
},
}, nil
}
func (s *server) GetMetrics(ctx context.Context, in *pb.GetMetricsRequest) (*pb.GetMetricsResponse, error) {
cReq, sl, err := s.compute(ctx, in.ScaledObjectRef)
if err != nil {
return nil, err
}
// гистерезис: не даём дёргаться вниз чаще, чем раз в scaleDownHold
now := s.now()
cOut := int(cReq)
if s.lastC > 0 {
if cOut < s.lastC {
if now.Sub(s.lastChange) < s.scaleDownHold {
cOut = s.lastC
}
} else if cOut > s.lastC && cOut-s.lastC < s.scaleUpMinStep {
cOut = s.lastC + s.scaleUpMinStep
}
}
if cOut != s.lastC {
s.lastC = cOut
s.lastChange = now
}
s.logger.Printf("c*=%d, predictedSL=%.3f", cOut, sl)
return &pb.GetMetricsResponse{
MetricValues: []*pb.MetricValue{{
MetricName: "erlangc_required_replicas",
MetricValue: int64(cOut),
}},
}, nil
}
func (s *server) compute(ctx context.Context, in *pb.ScaledObjectRef) (int32, float64, error) {
md := in.ScalerMetadata
promURL := md["prometheusURL"]
if promURL == "" {
return 0, 0, errors.New("prometheusURL is required")
}
arrQ := md["arrivalRateQuery"] // should return λ [1/s]
serQ := md["serviceTimeQuery"] // should return average service time [s]
if arrQ == "" || serQ == "" {
return 0, 0, errors.New("arrivalRateQuery and serviceTimeQuery are required")
}
targetSL, _ := strconv.ParseFloat(md["targetSL"], 64) // e.g. 0.95
if targetSL <= 0 || targetSL >= 1 {
targetSL = 0.95
}
T, _ := strconv.ParseFloat(md["waitThresholdSeconds"], 64) // e.g. 1.0
if T <= 0 {
T = 1.0
}
lmbd, err := s.instantQuery(ctx, promURL, arrQ)
if err != nil {
return 0, 0, fmt.Errorf("arrival rate query: %w", err)
}
svc, err := s.instantQuery(ctx, promURL, serQ)
if err != nil {
return 0, 0, fmt.Errorf("service time query: %w", err)
}
if svc <= 0 || lmbd < 0 {
return 0, 0, errors.New("invalid metrics")
}
mu := 1.0 / svc
// нижняя граница: хотя бы ceil(λ/μ)
lb := int(math.Ceil(lmbd / mu))
if lb < s.minC {
lb = s.minC
}
ub := lb
// расширяем верхнюю границу, пока SL не выполнится или пока не упрёмся
for ; ub <= s.maxC; ub *= 2 {
sl := serviceLevel(lmbd, mu, float64(ub), T)
if sl >= targetSL {
break
}
if ub == 0 {
ub = 1
}
}
if ub > s.maxC {
return int32(s.maxC), serviceLevel(lmbd, mu, float64(s.maxC), T), nil
}
// бинарный поиск
best := ub
for lb <= ub {
m := (lb + ub) / 2
sl := serviceLevel(lmbd, mu, float64(m), T)
if sl >= targetSL {
best = m
ub = m - 1
} else {
lb = m + 1
}
}
return int32(best), serviceLevel(lmbd, mu, float64(best), T), nil
}
func (s *server) instantQuery(ctx context.Context, base, q string) (float64, error) {
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/api/v1/query?query=%s", base, urlQueryEscape(q)), nil)
if err != nil {
return 0, err
}
resp, err := s.httpc.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
var out struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Value [2]any `json:"value"`
} `json:"result"`
} `json:"data"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return 0, err
}
if out.Status != "success" || len(out.Data.Result) == 0 {
return 0, errors.New("no data")
}
valStr, _ := out.Data.Result[0].Value[1].(string)
return strconv.ParseFloat(valStr, 64)
}
// вероятность ждать
func erlangC(a float64, c int) float64 {
if c <= 0 || a <= 0 {
return 0
}
// устойчивое вычисление через рекуррентные коэффициенты
term := 1.0
sum := 1.0
for k := 1; k <= c-1; k++ {
term *= a / float64(k)
sum += term
}
termC := term * a / float64(c) // a^c / c!
if a >= float64(c) {
// перегрузка — формально неустойчиво, возвращаем 1
return 1.0
}
num := termC * float64(c) / (float64(c) - a)
return num / (sum + num)
}
func serviceLevel(lambda, mu, c float64, T float64) float64 {
if c <= 0 {
return 0
}
Pw := erlangC(lambda/mu, int(c))
gap := c*mu - lambda
if gap <= 0 {
return 0
}
return 1 - Pw*math.Exp(-gap*T)
}
// простая экранизация без внешних зависимостей
func urlQueryEscape(q string) string {
r := ""
for i := 0; i < len(q); i++ {
ch := q[i]
switch ch {
case ' ':
r += "%20"
case '"':
r += "%22"
case '+':
r += "%2B"
case '%':
r += "%25"
case '&':
r += "%26"
default:
r += string(ch)
}
}
return r
}
Манифесты: деплой скейлера и ScaledObject
Разворачиваем Deployment с нашим gRPC‑сервисом. Если нужен TLS, добавляем секрет с ключом и сертификатом и включаем флаги. ScaledObject настраиваем так, чтобы HPA воспринимал возвращаемую метрику как AverageValue = 1
. В metadata
кладём PromQL‑запросы для λ и среднего сервис‑тайма.
apiVersion: apps/v1
kind: Deployment
metadata:
name: erlangc-scaler
spec:
replicas: 1
selector:
matchLabels: { app: erlangc-scaler }
template:
metadata:
labels: { app: erlangc-scaler }
spec:
containers:
- name: scaler
image: ghcr.io/org/erlangc-scaler:1.0.0
args:
- --listen=:9090
- --scale-down-hold=120s
- --scale-up-min-step=2
ports:
- name: grpc
containerPort: 9090
readinessProbe:
tcpSocket: { port: 9090 }
initialDelaySeconds: 2
periodSeconds: 5
livenessProbe:
tcpSocket: { port: 9090 }
initialDelaySeconds: 10
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: erlangc-scaler
spec:
selector:
app: erlangc-scaler
ports:
- name: grpc
port: 9090
targetPort: 9090
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: queue-worker-erlangc
spec:
scaleTargetRef:
name: queue-worker
pollingInterval: 30
cooldownPeriod: 300
minReplicaCount: 0
maxReplicaCount: 200
advanced:
horizontalPodAutoscalerConfig:
behavior:
# быстро вверх, аккуратно вниз
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
triggers:
- type: external
metricType: AverageValue
metadata:
scalerAddress: erlangc-scaler.default.svc.cluster.local:9090
prometheusURL: http://prometheus-server.monitoring.svc.cluster.local
# прогноз λ на 60 с вперёд по регрессии на последнем часу
arrivalRateQuery: sum(predict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60))
# средний сервис-тайм по сумме/счётчику
serviceTimeQuery: (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])))
waitThresholdSeconds: "1"
targetSL: "0.95"
Семантика external‑метрик и HPA‑формулы подтверждена в официальных доках Kubernetes и KEDA. Обратите внимание на behavior.scaleDown.stabilizationWindowSeconds
— дефолтное значение 300 с подавляет флаппинг при схлопывании, это нормальная практика.
Генератор нагрузки и эталонный воркер
Чтобы воспроизвести поведение M/M/c, возьмём Redis Lists как очередь, генератор с пуассоновским входом и воркер, который моделирует экспоненциальный сервис‑тайм с заданным средним. Нагрузчик просто пушит JSON‑сообщения в лист, воркер блокирующе читает и обрабатывает.
// cmd/loadgen/main.go
package main
import (
"context"
"flag"
"log"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
)
func main() {
addr := flag.String("redis", "redis:6379", "redis addr")
queue := flag.String("queue", "jobs", "queue name")
lambda := flag.Float64("lambda", 10, "arrival rate per second")
flag.Parse()
rdb := redis.NewClient(&redis.Options{Addr: *addr})
ctx := context.Background()
src := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
// экспоненциальные межприходы
u := src.Float64()
wait := -mathLog(u) / *lambda
time.Sleep(time.Duration(wait * float64(time.Second)))
err := rdb.LPush(ctx, *queue, time.Now().UnixNano()).Err()
if err != nil {
log.Printf("LPUSH: %v", err)
}
}
}
func mathLog(u float64) float64 { return -1 * (math.Log(1-u)) }
// cmd/worker/main.go
package main
import (
"context"
"flag"
"log"
"math"
"math/rand"
"time"
"github.com/redis/go-redis/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"net/http"
)
var (
jobDur = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "job_duration_seconds",
Help: "service time per job",
Buckets: prometheus.DefBuckets,
})
processed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "queue_processed_total",
Help: "jobs processed",
})
)
func main() {
addr := flag.String("redis", "redis:6379", "redis addr")
queue := flag.String("queue", "jobs", "queue name")
mean := flag.Float64("mean", 0.2, "mean service time in seconds")
flag.Parse()
prometheus.MustRegister(jobDur, processed)
go func() {
http.Handle("/metrics", promhttp.Handler())
_ = http.ListenAndServe(":8080", nil)
}()
rdb := redis.NewClient(&redis.Options{Addr: *addr})
ctx := context.Background()
src := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
res, err := rdb.BRPop(ctx, 0, *queue).Result()
if err != nil {
log.Printf("BRPOP: %v", err)
time.Sleep(time.Second)
continue
}
start := time.Now()
// экспоненциальный сервис-тайм
u := src.Float64()
t := -math.Log(1-u) * (*mean)
time.Sleep(time.Duration(t * float64(time.Second)))
jobDur.Observe(time.Since(start).Seconds())
processed.Inc()
}
}
С такими метриками Prometheus‑запросы в ScaledObject становятся простыми: arrivalRateQuery = sum(rate(queue_in_total[5m]))
или используйте queue_enqueued_total
в своём стеке. serviceTimeQuery = sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m]))
. Подчеркну: predict_linear
подходит для gauges, для скоростей используйте подход через субквест [...]
.
Дэшборды: что мониторить
Хочется видеть одновременно прогноз , оценку
, расчётное
, фактические реплики и сервис‑левел. Один из минимальных дашбордов можно собрать так:
Панель 1:
sum(rate(queue_in_total[5m]))
иpredict_linear(sum(rate(queue_in_total[5m]))[1h:5m], 60)
— текущая и прогнозная интенсивность.Панель 2:
1 / (sum(rate(job_duration_seconds_sum[5m])) / sum(rate(job_duration_seconds_count[5m])))
—.
Панель 3: внешняя метрика
erlangc_required_replicas
.Панель 4: расчётный сервис‑левел панелью Stat, вычисляем в скейлере и экспортируем или прикидываем в PromQL аналитику, если метрики доступны.
Панель 5:
kube_deployment_status_replicas
по целевому Deployment.
Минимальный JSON‑фрагмент панели Stat для :
{
"type": "stat",
"title": "Required replicas (Erlang-C)",
"targets": [
{
"expr": "erlangc_required_replicas",
"legendFormat": "c*"
}
],
"options": {
"reduceOptions": { "calcs": ["lastNotNull"], "values": false }
}
}
Устойчивость
Даже идеальная модель может заставить HPA дёргаться, если входные оценки шумные. Контур стабилизации в двух местах:
На стороне HPA через
behavior
: стабилизационное окно на scale down по умолчанию 300 секунд и ограничение скорости изменений. Это штатная возможность autoscaling/v2.В самом скейлере: держим минимальный шаг апскейла, откладываем даунскейл на
scaleDownHold
, накладываем допуск ε на целевой SLO. Это снижает чувствительность к лёгким промахам в λ/μ.
При желании можно добавить EMA к и
или перейти на PromQL
double_exponential_smoothing
, но помните, что это сглаживание, а не точное предсказание.
Что в итоге получается
Мы переводим SLO по ожиданию в очереди в конкретное число реплик через Erlang‑C, используя текущие и прогнозные метрики. Это прозрачная математика плюс аккуратный контур стабилизации. Профит — меньше промахов по SLO, меньше догоняющих апскейлов при всплесках, контролируемая стоимость. А главное — логика автоскейлинга перестаёт зависеть от конкретного брокера, потому что мы подаём в скейлер не размер очереди, а первичные параметры потока и сервиса.
В итоге предиктивный автоскейлинг через Erlang‑C и KEDA позволяет проектировать систему не «по факту перегрузки», а исходя из формализованных требований к уровню сервиса. Это уже уровень архитектурных решений, где математика, распределённые системы и практики эксплуатации должны работать вместе.
Если вы хотите глубже разобраться в том, как строить подобные механизмы в продакшн‑среде, рекомендуем обратить внимание на курс Highload Architect. Там вы сможете изучить подходы к проектированию систем, где подобные методы масштабирования — не исключение, а правило. Чтобы узнать, подойдет ли вам программа курса, пройдите вступительный тест.
Рост в IT быстрее с Подпиской — дает доступ к 3-м курсам в месяц по цене одного. Подробнее
gis
А что значит "SLO" в контексте вашей статьи?