В нашей предыдущей статье про приключения сервера Kubernetes API мы рассмотрели интерфейс хранилища и исследовали единственную реализацию в дереве: etcd3. Однако внимательное прочтение сносок в этом посте показало, что мы были не совсем честными, говоря, что etcd3 — единственная реализация.
Это можно оспорить, ведь Cacher также может быть технической реализацией, хотя ему и требуется базовая реализация. Подробно рассмотрим кэширование в одном из будущих постов. А сейчас подробно изучим кэширование и его возможности.
О чём поговорим в статье:
Наблюдение (???? «Я хочу проблему и ее решение»);
Все смотрят;
Кэширование (???? «Мне просто нужно решение»);
watchCache
;cacherListerWatcher
;Reflector
;Свяжем всё вместе;
Метод
Watch()
;Все смотрят… Успешно (???? «Просто покажите мне код»).
Наблюдение
Прежде чем мы введем какой-либо тип кэширования, стоит вернуться к тому факту, что storage.Interface
включает в себя метод Watch
, реализация etcd3
поддерживает его. Ранее мы изучали возможность отслеживания изменений, обращаясь непосредственно к etcd
с помощью etcdctl
. etcd
предлагает Watch API, который использует двунаправленные потоки gRPC для доставки событий клиентам при изменении значения для ключа.
Реализация etcd3 Watch создает watch.Interface
, поддерживаемый watcher
. Новый watcher
создается при каждом вызове Watch
, и каждый watcher
, по сути, просто вызывает etcd
Watch API и пересылает события по каналу.
Давайте возьмем нашу программу из предыдущего поста и модифицируем её, чтобы отслеживать изменения во всех ConfigMaps
в кластере.
Вы можете найти инструкции о том, как получить данные PCI, необходимые для связи с etcd
в кластере kind
, здесь.
directwatch.go
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/kubernetes/pkg/apis/core"
k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
)
func main() {
tlsConfig, err := (transport.TLSInfo{
CertFile: "./pki/etcd/server.crt",
KeyFile: "./pki/etcd/server.key",
TrustedCAFile: "./pki/etcd/ca.crt",
ClientCertFile: "./pki/apiserver-etcd-client.crt",
ClientKeyFile: "./pki/apiserver-etcd-client.key",
}).ClientConfig()
if err != nil {
panic(err)
}
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:2379"},
TLS: tlsConfig,
})
if err != nil {
panic(err)
}
scheme := runtime.NewScheme()
k8sv1.AddToScheme(scheme)
core.AddToScheme(scheme)
f := serializer.NewCodecFactory(scheme)
s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
w, err := s.Watch(context.Background(), "configmaps", storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
})
if err != nil {
panic(err)
}
for e := range w.ResultChan() {
co, ok := e.Object.(*v1.ConfigMap)
if !ok {
panic("not a config map!")
}
fmt.Printf("%s || %s/%s\n", e.Type, co.Namespace, co.Name)
}
}
Обратите внимание на тот факт, что мы регистрируем типы из k8s.io/kubernetes
скорее, чем k8s.io/api
. Это нужно, чтобы выполнить преобразование internal
версии в нашу желаемую версию (v1
). Мы также передаем nil CodecForVersions,
поскольку декодируем их только при чтении событий потока.
Если у вас запущен кластер kind
, вы можете начать перенаправление портов etcd Pod
.
$ kubectl port-forward -n kube-system pod/etcd-kind-control-plane 2379:2379
Теперь мы можем начать нашу программу.
$ go run directwatch.go
ADDED || default/kube-root-ca.crt
ADDED || kube-node-lease/kube-root-ca.crt
ADDED || kube-public/cluster-info
ADDED || kube-public/kube-root-ca.crt
ADDED || kube-system/coredns
ADDED || kube-system/extension-apiserver-authentication
ADDED || kube-system/kube-proxy
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
Как и ожидалось, мы получаем ADDED
события для всех ConfigMaps
, находящихся в данный момент в кластере. Можно создавать, обновлять и удалять ConfigMap
, чтобы увидеть, как другие события отражаются в потоке просмотра.
$ kubectl create configmap k8s-asa --from-literal hello=world
configmap/k8s-asa created
$ kubectl create configmap k8s-asa --from-literal hello=asa -o yaml --dry-run | kubectl apply -f -
configmap/k8s-asa configured
$ kubectl delete configmap k8s-asa
configmap "k8s-asa" deleted
Посмотрим на каждое из этих событий в потоке просмотра с соответствующим типом.
ADDED || default/k8s-asa
MODIFIED || default/k8s-asa
DELETED || default/k8s-asa
Общее наблюдение
Теперь мы можем получать события в любое время, когда меняется интересующий нас тип, и не похоже, что etcd
или наша маленькая программа сильно «потеют». Но давайте взглянем на некоторые показатели etcd
, чтобы убедиться в этом. Мы можем найти открытый адрес метрик для etcd
в нашем кластере kind
, просмотрев команду для контейнера в модуле Pod
.
$ kubectl get pod -n kube-system etcd-kind-control-plane -o=jsonpath={.spec.containers[0].command} | jq .
[
"etcd",
"--advertise-client-urls=https://172.19.0.2:2379",
"--cert-file=/etc/kubernetes/pki/etcd/server.crt",
"--client-cert-auth=true",
"--data-dir=/var/lib/etcd",
"--experimental-initial-corrupt-check=true",
"--experimental-watch-progress-notify-interval=5s",
"--initial-advertise-peer-urls=https://172.19.0.2:2380",
"--initial-cluster=kind-control-plane=https://172.19.0.2:2380",
"--key-file=/etc/kubernetes/pki/etcd/server.key",
"--listen-client-urls=https://127.0.0.1:2379,https://172.19.0.2:2379",
"--listen-metrics-urls=http://127.0.0.1:2381",
"--listen-peer-urls=https://172.19.0.2:2380",
"--name=kind-control-plane",
"--peer-cert-file=/etc/kubernetes/pki/etcd/peer.crt",
"--peer-client-cert-auth=true",
"--peer-key-file=/etc/kubernetes/pki/etcd/peer.key",
"--peer-trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt",
"--snapshot-count=10000",
"--trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt"
]
--listen-metrics-urls
— то, что мы ищем .etcd
будет обслуживать показатели Prometheus в конечной точке /metrics
по этому адресу. Создаём минимальный конфигурационный файл Prometheus, чтобы дать команду сборщику выполнить очистку etcd
.
prometheus.yml
global:
scrape_interval: 1s
scrape_configs:
- job_name: etcd
static_configs:
- targets: ['127.0.0.1:2379']
Прежде чем мы запустим Prometheus, убедитесь, что можно получить доступ к конечной точке etcd metrics
. Перенаправление портов в пространство имен host network
позволит запускать Prometheus в пространстве имен host network
, что упрощает доступ к панели мониторинга в браузере.
Перенесите конечную точку etcd metrics
.
$ kubectl port-forward -n kube-system pod/etcd-kind-control-plane 2381:2381
Затем запустите контейнер Prometheus, подключённый к сети хоста.
$ docker run --net=host --rm -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus
Панель мониторинга Prometheus теперь должна быть доступна в браузере по адресу 127.0.0.1:9090
. Мы можем получить график показателей, которые предоставляет etcd
, введя запрос. Например, irate(process_cpu_seconds_total{job="etcd"}[5m])
покажет скорость увеличения общего системного и пользовательского процессорного времени. Если запустить программу сейчас, которая устанавливает просмотры для ConfigMaps
, мы не увидим существенного влияния на процессор.
Диапазон по оси Y составляет от 0.03
до 0.23
.
Примечание: Поскольку мы запускаем etcd
в кластере kind
, сервер API Kubernetes и другие компоненты взаимодействуют с ним в дополнение к нагрузке от программы.
Этого и следовало ожидать: одни часы вряд ли окажут существенное влияние. Но если мы увеличим количество часов, то начнем сталкиваться с проблемами. Давайте настроим нашу программу так, чтобы она запускала 10 000 просмотров на ConfigMaps
.
manydirectwatch.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/kubernetes/pkg/apis/core"
k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
)
func main() {
tlsConfig, err := (transport.TLSInfo{
CertFile: "./pki/etcd/server.crt",
KeyFile: "./pki/etcd/server.key",
TrustedCAFile: "./pki/etcd/ca.crt",
ClientCertFile: "./pki/apiserver-etcd-client.crt",
ClientKeyFile: "./pki/apiserver-etcd-client.key",
}).ClientConfig()
if err != nil {
panic(err)
}
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:2379"},
TLS: tlsConfig,
})
if err != nil {
panic(err)
}
scheme := runtime.NewScheme()
k8sv1.AddToScheme(scheme)
core.AddToScheme(scheme)
f := serializer.NewCodecFactory(scheme)
s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
for i := 0; i < 10000; i++ {
w, err := s.Watch(context.Background(), "configmaps", storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
})
if err != nil {
panic(err)
}
go func() {
for e := range w.ResultChan() {
co, ok := e.Object.(*v1.ConfigMap)
if !ok {
panic("not a config map!")
}
fmt.Printf("%s || %s/%s\n", e.Type, co.Namespace, co.Name)
}
}()
}
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
<-stop
}
Если мы запустим программу сейчас, то должны увидеть, что все 10 000 просмотров получают список всех ConfigMaps
в кластере. Когда создадим новую ConfigMap
, мы сможем увидеть, что все часы получают уведомления.
$ go run manysimplewatch.go
...
ADDED || kube-system/kube-proxy
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
...
$ kubectl create configmap testing
configmap/testing created
...
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
...
Оглядываясь назад на наши показатели etcd
, мы видим значительный скачок производительности процессора при запуске программы. Это вызывает беспокойство, но, возможно, еще более неприятным является объем работы, который был проделан при создании новой ConfigMap
. Пока нагрузка моделируется в программе, сервер Kubernetes API обслуживает запросы на просмотр от различных клиентов. Устанавливать наблюдение в etcd
для каждого из них нет необходимости, ведь серверу API требуется только одно уведомление, чтобы проинформировать всех клиентов.
Диапазон значений по оси Y составляет от 0,00
до 3,00
.
Первый большой всплеск соответствует моменту запуска нашей программы. Второй всплеск, хотя и меньший, соответствует созданию единой ConfigMap
с установленными 10 000 наблюдениями.
Кэширование
Как вы догадались, сервер API Kubernetes использует стратегию кэширования для решения этой проблемы, и взаимодействие со слоем кэширования во многом похоже на прямой вызов etcd3 storage.Interface
. Фактически, уровень кэширования реализует storage.Interface
тоже.
При построении cacher.Cacher
передается структура Config
, которая включает в себя storage.Interface
. Как указано в строке документа.
// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).
Например, поскольку etcd
информирует наблюдателя о событиях создания (ADDED
), реализация Create() Cacher
напрямую вызывает storage Create()
.
// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return c.storage.Create(ctx, key, obj, out, ttl)
}
Другие методы выполняют незначительные операции перед вызовом базового storage
, но Watch()
вообще не взаимодействует с ним напрямую. Вместо этого он создает новый cacheWatcher
и запускает свой цикл обработки событий. Прежде чем разобраться в том, как работает cacheWatcher
, нужно понять, как события доставляются из etcd
всем наблюдателям, запущенным этим методом. При создании Cacher создаются три важных базовых компонента, два из которых — реализация общих интерфейсов в пакете client-go cache
:
watchCache
— реализациейcache.Store
;cacherListerWatcher
— реализациейcache.ListerWatcher
;Reflector
принимает реализации двух ранее упомянутых интерфейсов и использует последний для заполнения первого.
Давайте рассмотрим каждый из них более подробно.
watchCache
watchCache
— место, где хранится фактический кэш событий, которые передаются наблюдателям. Это скользящее окно с возможностью изменения размера. Это означает, что при добавлении нового элемента мы либо удаляем самый старый, либо увеличиваем размер кэша, чтобы вместить добавление нового элемента. Как упоминалось ранее, watchCache
реализует cache.Store
, которое поддерживает следующие методы.
type Store interface {
// Add adds the given object to the accumulator associated with the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list, you should not reference
// it after calling this function.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
Реализации Add()
, Update()
, и Delete()
создают watch.Event
, затем передайте его методу processEvent()
. Здесь событие преобразуется в watchCacheEvent
, который передается updateCache()
. Метод updateCache()
вместе с вызываемым им методом resizeCacheLocked()
— это то, где реализована логика скользящего окна.
// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
w.resizeCacheLocked(event.RecordTime)
if w.isCacheFullLocked() {
// Cache is full - remove the oldest element.
w.startIndex++
}
w.cache[w.endIndex%w.capacity] = event
w.endIndex++
}
// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
}
Перед возвратом из processEvent()
вызывается функция eventHandler()
, если она была передана во время построения watchCache
. Мы вернемся к этому вскоре, когда рассмотрим, как наблюдатели уведомляются о событиях.
// Avoid calling event handler under lock.
// This is safe as long as there is at most one call to Add/Update/Delete and
// UpdateResourceVersion in flight at any point in time, which is true now,
// because reflector calls them synchronously from its main thread.
if w.eventHandler != nil {
w.eventHandler(wcEvent)
}
Другие методы, особенно те, которые включают получение и перечисление, полагаются на базовый cache.Indexer
. В итоге простые индексы, созданные в watchCache
, сводятся к потокобезопасной map
. В то время как скользящее окно событий в watchCache
помогает отправлять обновления многим различным наблюдателям, индексатор позволяет новым наблюдателям легко получать последнее состояние объектов, что требуется при запуске наблюдения.
Вместе эти два механизма обслуживают запросы из их хранилища в памяти, вместо того чтобы обращаться к etcd
напрямую.
cacherListerWatcher
cacherListerWatcher
намного проще, чем watchCache
, так как оборачивает storage.Interface
, который передается Cacher
и вызывает базовые методы GetList()
и Watch()
.
// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
pred := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: options.Limit,
Continue: options.Continue,
}
storageOpts := storage.ListOptions{
ResourceVersionMatch: options.ResourceVersionMatch,
Predicate: pred,
Recursive: true,
}
if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
opts := storage.ListOptions{
ResourceVersion: options.ResourceVersion,
Predicate: storage.Everything,
Recursive: true,
ProgressNotify: true,
}
return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}
cacherListerWatcher
— то, что извлекает данные, которые сохраняются в watchCache
. Но нужен механизм, чтобы связать их вместе.
Reflector
watchCache
и cacherListerWatcher
создают перед Reflector
, чтобы передать их ему.
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)
Reflector
определен в пакете client-go/tools/cache
со следующей документальной строкой.
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
Большая часть функциональности вызывается из его метода ListAndWatch()
Он выполняет то, что следует из его названия: перечисляет, а затем просматривает. События, поступающие от ListerWatcher (cacherListerWatcher)
, обрабатываются в watchHandler()
. Именно здесь данные передаются в watchCache
.
case watch.Added:
err := store.Add(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Modified:
err := store.Update(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
}
case watch.Deleted:
// TODO: Will any consumers need access to the "last known
// state", which is passed in event.Object? If so, may need
// to change this.
err := store.Delete(event.Object)
if err != nil {
utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
}
Связываем всё вместе
Когда все компоненты на месте, пора запустить Reflector
, чтобы извлекать данные из cacherListerWatcher
и сохранять их в watchCache
, который будет информировать Cacher
об обновлениях.
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
defer cacher.terminateAllWatchers()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
Сначала рассмотрим второй вызов, cacher.startCaching(stopCh)
— так мы запускаем Reflector
с помощью вызова ListAndWatch()
.
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
// It is safe to use the cache after a successful list until a disconnection.
// We start with usable (write) locked. The below OnReplace function will
// unlock it after a successful list. The below defer will then re-lock
// it when this function exits (always due to disconnection), only if
// we actually got a successful list. This cycle will repeat as needed.
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
})
defer func() {
if successfulList {
c.ready.set(false)
}
}()
c.terminateAllWatchers()
// Note that since onReplace may be not called due to errors, we explicitly
// need to retry it on errors under lock.
// Also note that startCaching is called in a loop, so there's no need
// to have another loop here.
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
}
}
Это приведет к тому, что Reflector
начнет заполнять watchCache
, который уведомит Cacher
, чтобы он мог отправить его всем наблюдателям. Метод cacher.processEvent()
был передан в watchCache
как eventHandler()
, который мы видели ранее.
func (c *Cacher) processEvent(event *watchCacheEvent) {
if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
// Monitor if this gets backed up, and how much.
klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen)
}
c.incoming <- *event
}
Всё отправляется по каналу c.incoming
, который затем считывается в методе cacher.dispatchEvents()
, прежде чем быть отправленным каждому наблюдателю.
case event, ok := <-c.incoming:
if !ok {
return
}
// Don't dispatch bookmarks coming from the storage layer.
// They can be very frequent (even to the level of subseconds)
// to allow efficient watch resumption on kube-apiserver restarts,
// and propagating them down may overload the whole system.
//
// TODO: If at some point we decide the performance and scalability
// footprint is acceptable, this is the place to hook them in.
// However, we then need to check if this was called as a result
// of a bookmark event or regular Add/Update/Delete operation by
// checking if resourceVersion here has changed.
if event.Type != watch.Bookmark {
c.dispatchEvent(&event)
}
lastProcessedResourceVersion = event.ResourceVersion
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
Когда мы вызываем c.dispatchEvent(&event)
, список cacheWatcher
фильтруется по тем, кто заинтересован в данном событии, и событие отправляется каждому.
Метод Watch()
Это было непросто, но теперь, когда мы знаем, как обрабатываются события, пришло время вернуться к методу Watch()
. Вместо запуска нового наблюдения etcd
при вызове этого метода, создаем cacheWatcher
, который добавляется к кандидатам, когда Cacher
обрабатывает события. Каждый cacheWatcher
имеет свой собственный цикл обработки: watcher.processInterval()
. Метод Watch()
запускает цикл обработки для наблюдателя, который он создает после добавления его в список наблюдателей для Cacher
.
func() {
c.Lock()
defer c.Unlock()
// Update watcher.forget function once we can compute it.
watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
// Add it to the queue only when the client support watch bookmarks.
if watcher.allowWatchBookmarks {
c.bookmarkWatchers.addWatcher(watcher)
}
c.watcherIdx++
}()
go watcher.processInterval(ctx, cacheInterval, watchRV)
return watcher, nil
cacheInterval
создается ранее в методе и доставляет любые существующие события в watchCache
, которые интересуют cacheWatcher
. Метод watcher.processInterval()
сначала обрабатывает все события из cacheInterval
, затем начинает ожидать обработки будущих отправленных событий.
for {
event, err := cacheInterval.Next()
if err != nil {
klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)
// With some events already sent, update resourceVersion so that
// events that were buffered and not yet processed won't be delivered
// to this watcher second time causing going back in time.
resourceVersion = event.ResourceVersion
initEventCount++
}
Примечание: некоторые комментарии к коду удалены для краткости.
Обработка событий состоит из вызова функции sendWatchCacheEvent()
, которая преобразует его. Затем доставляет по каналу result, который становится тем же каналом, из которого вызывающие функцию Watch()
могут считывать данные!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}
select {
case <-c.done:
return
default:
}
select {
case c.result <- *watchEvent:
case <-c.done:
}
}
Примечание: некоторые комментарии к коду удалены для краткости.
Все смотрят… Продуктивно
Давайте посмотрим на реализацию Cacher storage.Interface
в действии. Мы берём нашу программу и модифицируем её, чтобы передать реализацию хранилища etcd Cacher
.
cachewatch.go
package main
import (
"context"
"fmt"
"os"
"os/signal"
"go.etcd.io/etcd/client/pkg/v3/transport"
clientv3 "go.etcd.io/etcd/client/v3"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/cacher"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
"k8s.io/kubernetes/pkg/apis/core"
k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/utils/clock"
)
func main() {
tlsConfig, err := (transport.TLSInfo{
CertFile: "./pki/etcd/server.crt",
KeyFile: "./pki/etcd/server.key",
TrustedCAFile: "./pki/etcd/ca.crt",
ClientCertFile: "./pki/apiserver-etcd-client.crt",
ClientKeyFile: "./pki/apiserver-etcd-client.key",
}).ClientConfig()
if err != nil {
panic(err)
}
c, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:2379"},
TLS: tlsConfig,
})
if err != nil {
panic(err)
}
scheme := runtime.NewScheme()
k8sv1.AddToScheme(scheme)
core.AddToScheme(scheme)
f := serializer.NewCodecFactory(scheme)
s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
ca, err := cacher.NewCacherFromConfig(cacher.Config{
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: v1.Resource("configmaps"),
ResourcePrefix: "configmaps",
KeyFunc: func(o runtime.Object) (string, error) { return storage.NamespaceKeyFunc("configmaps", o) },
GetAttrsFunc: func(o runtime.Object) (label labels.Set, field fields.Set, err error) {
return labels.Set{},
fields.Set{}, nil
},
NewFunc: func() runtime.Object {
return &v1.ConfigMap{}
},
NewListFunc: func() runtime.Object {
return &v1.ConfigMapList{}
},
Codec: f.LegacyCodec(k8sv1.SchemeGroupVersion),
Clock: clock.RealClock{},
})
if err != nil {
panic(err)
}
for i := 0; i < 10000; i++ {
w, err := ca.Watch(context.Background(), "configmaps", storage.ListOptions{
Predicate: storage.Everything,
Recursive: true,
})
if err != nil {
panic(err)
}
go func() {
for e := range w.ResultChan() {
switch o := e.Object.(type) {
case *v1.ConfigMap:
fmt.Printf("%s || %s/%s\n", e.Type, o.Namespace, o.Name)
default:
co, ok := o.(runtime.CacheableObject)
if !ok {
panic("unknown event")
}
c, ok := co.GetObject().(*v1.ConfigMap)
if !ok {
panic("unknown object")
}
fmt.Printf("%s || %s/%s\n", e.Type, c.Namespace, c.Name)
}
}
}()
}
stop := make(chan os.Signal)
signal.Notify(stop, os.Interrupt)
<-stop
}
Поскольку Prometheus
все еще запущен и очищает наш экземпляр etcd
в нашем кластере kind
, мы запускаем нашу программу и видим влияние на процессор.
$ go run cachewatch.go
...
ADDED || kube-system/kube-proxy
ADDED || kube-public/cluster-info
ADDED || kube-system/coredns
ADDED || default/kube-root-ca.crt
ADDED || kube-public/kube-root-ca.crt
ADDED || kube-system/kubelet-config
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubeadm-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
ADDED || default/kube-root-ca.crt
...
Хотя установлено 10 000 наблюдателей, в нашем экземпляре etcd
нет заметного влияния на процессор. То же самое справедливо, если мы создадим новую ConfigMap
.
$ kubectl create configmap testing
configmap/testing created
...
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
...
Диапазон оси Y 0.00
до 0.18
.
Заключительные мысли
Это была вторая часть серии приключений сервера Kubernetes API. Основываясь на нашем storage.Interface, мы узнали, как Kubernetes оптимизирует работу watch перед лицом потенциально большого количества клиентов. Как и в большинстве случаев в разработке программного обеспечения, эта оптимизация не лишена компромиссов, но она достигает цели сделать общий случай более эффективным. В «Kubernetes: Мега» мы рассматриваем и затрагиваем такие области управления инфраструктурой, в которых каждое улучшение может помочь компании сэкономить сотни тысяч рублей.
Мы посмотрим, есть ли какие-либо случаи, когда этот компромисс вызывает проблемы, в будущих публикациях.