В нашей предыдущей статье про приключения сервера Kubernetes API мы рассмотрели интерфейс хранилища и исследовали единственную реализацию в дереве: etcd3. Однако внимательное прочтение сносок в этом посте показало, что мы были не совсем честными, говоря, что etcd3 — единственная реализация.

Это можно оспорить, ведь Cacher также может быть технической реализацией, хотя ему и требуется базовая реализация. Подробно рассмотрим кэширование в одном из будущих постов. А сейчас подробно изучим кэширование и его возможности. 

О чём поговорим в статье:

  • Наблюдение (???? «Я хочу проблему и ее решение»);

  • Все смотрят;

  • Кэширование (???? «Мне просто нужно решение»);

  • watchCache ;

  • cacherListerWatcher ;

  • Reflector;

  • Свяжем всё вместе; 

  • Метод  Watch();

  • Все смотрят… Успешно (???? «Просто покажите мне код»).

Наблюдение

Прежде чем мы введем какой-либо тип кэширования, стоит вернуться к тому факту, что storage.Interface включает в себя метод Watch, реализация etcd3 поддерживает его. Ранее мы изучали возможность отслеживания изменений, обращаясь непосредственно к etcd с помощью etcdctl. etcd предлагает Watch API, который использует двунаправленные потоки gRPC для доставки событий клиентам при изменении значения для ключа.

Реализация etcd3 Watch создает watch.Interface, поддерживаемый watcher. Новый watcher создается при каждом вызове Watch, и каждый watcher, по сути, просто вызывает etcd Watch API и пересылает события по каналу.

Давайте возьмем нашу программу из предыдущего поста и модифицируем её, чтобы отслеживать изменения во всех ConfigMaps в кластере.

Вы можете найти инструкции о том, как получить данные PCI, необходимые для связи с etcd в кластере kind, здесь.

directwatch.go

package main

import (
	"context"
	"fmt"

	"go.etcd.io/etcd/client/pkg/v3/transport"
	clientv3 "go.etcd.io/etcd/client/v3"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	"k8s.io/apiserver/pkg/storage"
	"k8s.io/apiserver/pkg/storage/etcd3"
	"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
	"k8s.io/kubernetes/pkg/apis/core"
	k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
)

func main() {
	tlsConfig, err := (transport.TLSInfo{
		CertFile:       "./pki/etcd/server.crt",
		KeyFile:        "./pki/etcd/server.key",
		TrustedCAFile:  "./pki/etcd/ca.crt",
		ClientCertFile: "./pki/apiserver-etcd-client.crt",
		ClientKeyFile:  "./pki/apiserver-etcd-client.key",
	}).ClientConfig()
	if err != nil {
		panic(err)
	}
	c, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"https://127.0.0.1:2379"},
		TLS:       tlsConfig,
	})
	if err != nil {
		panic(err)
	}

	scheme := runtime.NewScheme()
	k8sv1.AddToScheme(scheme)
	core.AddToScheme(scheme)
	f := serializer.NewCodecFactory(scheme)
	s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
	w, err := s.Watch(context.Background(), "configmaps", storage.ListOptions{
		Predicate: storage.Everything,
		Recursive: true,
	})
	if err != nil {
		panic(err)
	}
	for e := range w.ResultChan() {
		co, ok := e.Object.(*v1.ConfigMap)
		if !ok {
			panic("not a config map!")
		}
		fmt.Printf("%s || %s/%s\n", e.Type, co.Namespace, co.Name)
	}
}

Обратите внимание на тот факт, что мы регистрируем типы из k8s.io/kubernetes скорее, чем k8s.io/api. Это нужно, чтобы выполнить преобразование internal версии в нашу желаемую версию (v1). Мы также передаем nil  CodecForVersions, поскольку декодируем их только при чтении событий потока.

Если у вас запущен кластер kind, вы можете начать перенаправление портов etcd Pod.

$ kubectl port-forward -n kube-system pod/etcd-kind-control-plane 2379:2379

Теперь мы можем начать нашу программу.

$ go run directwatch.go
ADDED || default/kube-root-ca.crt
ADDED || kube-node-lease/kube-root-ca.crt
ADDED || kube-public/cluster-info
ADDED || kube-public/kube-root-ca.crt
ADDED || kube-system/coredns
ADDED || kube-system/extension-apiserver-authentication
ADDED || kube-system/kube-proxy
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config

Как и ожидалось, мы получаем ADDED события для всех ConfigMaps, находящихся в данный момент в кластере. Можно создавать, обновлять и удалять ConfigMap, чтобы увидеть, как другие события отражаются в потоке просмотра.

$ kubectl create configmap k8s-asa --from-literal hello=world
configmap/k8s-asa created

$ kubectl create configmap k8s-asa --from-literal hello=asa -o yaml --dry-run | kubectl apply -f -
configmap/k8s-asa configured

$ kubectl delete configmap k8s-asa
configmap "k8s-asa" deleted

Посмотрим на каждое из этих событий в потоке просмотра с соответствующим типом.

ADDED || default/k8s-asa
MODIFIED || default/k8s-asa
DELETED || default/k8s-asa

Общее наблюдение

Теперь мы можем получать события в любое время, когда меняется интересующий нас тип, и не похоже, что etcd или наша маленькая программа сильно «потеют». Но давайте взглянем на некоторые показатели etcd, чтобы убедиться в этом. Мы можем найти открытый адрес метрик для etcd в нашем кластере kind, просмотрев команду для контейнера в модуле Pod.

$ kubectl get pod -n kube-system etcd-kind-control-plane -o=jsonpath={.spec.containers[0].command} | jq .

[
  "etcd",
  "--advertise-client-urls=https://172.19.0.2:2379",
  "--cert-file=/etc/kubernetes/pki/etcd/server.crt",
  "--client-cert-auth=true",
  "--data-dir=/var/lib/etcd",
  "--experimental-initial-corrupt-check=true",
  "--experimental-watch-progress-notify-interval=5s",
  "--initial-advertise-peer-urls=https://172.19.0.2:2380",
  "--initial-cluster=kind-control-plane=https://172.19.0.2:2380",
  "--key-file=/etc/kubernetes/pki/etcd/server.key",
  "--listen-client-urls=https://127.0.0.1:2379,https://172.19.0.2:2379",
  "--listen-metrics-urls=http://127.0.0.1:2381",
  "--listen-peer-urls=https://172.19.0.2:2380",
  "--name=kind-control-plane",
  "--peer-cert-file=/etc/kubernetes/pki/etcd/peer.crt",
  "--peer-client-cert-auth=true",
  "--peer-key-file=/etc/kubernetes/pki/etcd/peer.key",
  "--peer-trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt",
  "--snapshot-count=10000",
  "--trusted-ca-file=/etc/kubernetes/pki/etcd/ca.crt"
]

--listen-metrics-urls  — то, что мы ищем .etcd будет обслуживать показатели Prometheus в конечной точке /metrics по этому адресу. Создаём минимальный конфигурационный файл Prometheus, чтобы дать команду сборщику выполнить очистку etcd.

prometheus.yml

global:
  scrape_interval: 1s
scrape_configs:
  - job_name: etcd
    static_configs:
    - targets: ['127.0.0.1:2379']

Прежде чем мы запустим Prometheus, убедитесь, что можно получить доступ к конечной точке etcd metrics. Перенаправление портов в пространство имен host network позволит запускать Prometheus в пространстве имен host network, что упрощает доступ к панели мониторинга в браузере.

Перенесите конечную точку etcd metrics.

$ kubectl port-forward -n kube-system pod/etcd-kind-control-plane 2381:2381

Затем запустите контейнер Prometheus, подключённый к сети хоста.

$ docker run --net=host --rm -v $(pwd)/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

Панель мониторинга Prometheus теперь должна быть доступна в браузере по адресу 127.0.0.1:9090. Мы можем получить график показателей, которые предоставляет etcd, введя запрос. Например, irate(process_cpu_seconds_total{job="etcd"}[5m]) покажет скорость увеличения общего системного и пользовательского процессорного времени. Если запустить программу сейчас, которая устанавливает просмотры для ConfigMaps, мы не увидим существенного влияния на процессор.

Диапазон по оси Y составляет от 0.03 до 0.23.

Примечание: Поскольку мы запускаем etcd в кластере kind, сервер API Kubernetes и другие компоненты взаимодействуют с ним в дополнение к нагрузке от программы.

Этого и следовало ожидать: одни часы вряд ли окажут существенное влияние. Но если мы увеличим количество часов, то начнем сталкиваться с проблемами. Давайте настроим нашу программу так, чтобы она запускала 10 000 просмотров на ConfigMaps.

manydirectwatch.go

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"

	"go.etcd.io/etcd/client/pkg/v3/transport"
	clientv3 "go.etcd.io/etcd/client/v3"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	"k8s.io/apiserver/pkg/storage"
	"k8s.io/apiserver/pkg/storage/etcd3"
	"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
	"k8s.io/kubernetes/pkg/apis/core"
	k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
)

func main() {
	tlsConfig, err := (transport.TLSInfo{
		CertFile:       "./pki/etcd/server.crt",
		KeyFile:        "./pki/etcd/server.key",
		TrustedCAFile:  "./pki/etcd/ca.crt",
		ClientCertFile: "./pki/apiserver-etcd-client.crt",
		ClientKeyFile:  "./pki/apiserver-etcd-client.key",
	}).ClientConfig()
	if err != nil {
		panic(err)
	}
	c, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"https://127.0.0.1:2379"},
		TLS:       tlsConfig,
	})
	if err != nil {
		panic(err)
	}

	scheme := runtime.NewScheme()
	k8sv1.AddToScheme(scheme)
	core.AddToScheme(scheme)
	f := serializer.NewCodecFactory(scheme)
	s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())
	for i := 0; i < 10000; i++ {
		w, err := s.Watch(context.Background(), "configmaps", storage.ListOptions{
			Predicate: storage.Everything,
			Recursive: true,
		})
		if err != nil {
			panic(err)
		}
		go func() {
			for e := range w.ResultChan() {
				co, ok := e.Object.(*v1.ConfigMap)
				if !ok {
					panic("not a config map!")
				}
				fmt.Printf("%s || %s/%s\n", e.Type, co.Namespace, co.Name)
			}
		}()
	}

	stop := make(chan os.Signal)
	signal.Notify(stop, os.Interrupt)
	<-stop
}

Если мы запустим программу сейчас, то должны увидеть, что все 10 000 просмотров получают список всех ConfigMaps в кластере. Когда создадим новую ConfigMap, мы сможем увидеть, что все часы получают уведомления.

$ go run manysimplewatch.go
...
ADDED || kube-system/kube-proxy
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
ADDED || kube-system/kube-root-ca.crt
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubelet-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
...
$ kubectl create configmap testing
configmap/testing created
...
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
ADDED || default/testing
...

Оглядываясь назад на наши показатели etcd, мы видим значительный скачок производительности процессора при запуске программы. Это вызывает беспокойство, но, возможно, еще более неприятным является объем работы, который был проделан при создании новой ConfigMap. Пока нагрузка моделируется в программе, сервер Kubernetes API обслуживает запросы на просмотр от различных клиентов. Устанавливать наблюдение в etcd для каждого из них нет необходимости, ведь серверу API требуется только одно уведомление, чтобы проинформировать всех клиентов.

Диапазон значений по оси Y составляет от 0,00 до 3,00.

Первый большой всплеск соответствует моменту запуска нашей программы. Второй всплеск, хотя и меньший, соответствует созданию единой ConfigMap с установленными 10 000 наблюдениями.

Кэширование

Как вы догадались, сервер API Kubernetes использует стратегию кэширования для решения этой проблемы, и взаимодействие со слоем кэширования во многом похоже на прямой вызов etcd3 storage.Interface. Фактически, уровень кэширования реализует storage.Interface тоже.

При построении cacher.Cacher передается структура Config, которая включает в себя storage.Interface. Как указано в строке документа

// Cacher implements storage.Interface (although most of the calls are just
// delegated to the underlying storage).

Например, поскольку etcd информирует наблюдателя о событиях создания (ADDED), реализация Create() Cacher напрямую вызывает storage Create().

// Create implements storage.Interface.
func (c *Cacher) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
	return c.storage.Create(ctx, key, obj, out, ttl)
}

Другие методы выполняют незначительные операции перед вызовом базового storage, но Watch() вообще не взаимодействует с ним напрямую. Вместо этого он создает новый cacheWatcher и запускает свой цикл обработки событий. Прежде чем разобраться в том, как работает cacheWatcher, нужно понять, как события доставляются из etcd всем наблюдателям, запущенным этим методом. При создании Cacher создаются три важных базовых компонента, два из которых — реализация общих интерфейсов в пакете client-go cache:

  • watchCache — реализацией cache.Store;

  • cacherListerWatcher — реализацией cache.ListerWatcher;

  • Reflector принимает реализации двух ранее упомянутых интерфейсов и использует последний для заполнения первого.

Давайте рассмотрим каждый из них более подробно.

watchCache

watchCache  — место, где хранится фактический кэш событий, которые передаются наблюдателям. Это скользящее окно с возможностью изменения размера. Это  означает, что при добавлении нового элемента мы либо удаляем самый старый, либо увеличиваем размер кэша, чтобы вместить добавление нового элемента. Как упоминалось ранее, watchCache реализует cache.Store, которое поддерживает следующие методы.

type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// List returns a list of all the currently non-empty accumulators
	List() []interface{}

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

Реализации Add(), Update(), и Delete() создают watch.Event, затем передайте его методу processEvent(). Здесь событие преобразуется в watchCacheEvent, который передается updateCache(). Метод updateCache() вместе с вызываемым им методом resizeCacheLocked() — это то, где реализована логика скользящего окна.

// Assumes that lock is already held for write.
func (w *watchCache) updateCache(event *watchCacheEvent) {
	w.resizeCacheLocked(event.RecordTime)
	if w.isCacheFullLocked() {
		// Cache is full - remove the oldest element.
		w.startIndex++
	}
	w.cache[w.endIndex%w.capacity] = event
	w.endIndex++
}

// resizeCacheLocked resizes the cache if necessary:
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
	if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
		capacity := min(w.capacity*2, w.upperBoundCapacity)
		if capacity > w.capacity {
			w.doCacheResizeLocked(capacity)
		}
		return
	}
	if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
		capacity := max(w.capacity/2, w.lowerBoundCapacity)
		if capacity < w.capacity {
			w.doCacheResizeLocked(capacity)
		}
		return
	}
}

Перед возвратом из processEvent() вызывается функция eventHandler(), если она была передана во время построения watchCache. Мы вернемся к этому вскоре, когда рассмотрим, как наблюдатели уведомляются о событиях.

// Avoid calling event handler under lock.
	// This is safe as long as there is at most one call to Add/Update/Delete and
	// UpdateResourceVersion in flight at any point in time, which is true now,
	// because reflector calls them synchronously from its main thread.
	if w.eventHandler != nil {
		w.eventHandler(wcEvent)
	}

Другие методы, особенно те, которые включают получение и перечисление, полагаются на базовый cache.Indexer. В итоге простые индексы, созданные в watchCache, сводятся к потокобезопасной map. В то время как скользящее окно событий в watchCache помогает отправлять обновления многим различным наблюдателям, индексатор позволяет новым наблюдателям легко получать последнее состояние объектов, что требуется при запуске наблюдения.

Вместе эти два механизма обслуживают запросы из их хранилища в памяти, вместо того чтобы обращаться к etcd напрямую.

cacherListerWatcher

cacherListerWatcher намного проще, чем watchCache, так как оборачивает storage.Interface, который передается Cacher и вызывает базовые методы GetList() и Watch().

// NewCacherListerWatcher returns a storage.Interface backed ListerWatcher.
func NewCacherListerWatcher(storage storage.Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
	return &cacherListerWatcher{
		storage:        storage,
		resourcePrefix: resourcePrefix,
		newListFunc:    newListFunc,
	}
}

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
	list := lw.newListFunc()
	pred := storage.SelectionPredicate{
		Label:    labels.Everything(),
		Field:    fields.Everything(),
		Limit:    options.Limit,
		Continue: options.Continue,
	}

	storageOpts := storage.ListOptions{
		ResourceVersionMatch: options.ResourceVersionMatch,
		Predicate:            pred,
		Recursive:            true,
	}
	if err := lw.storage.GetList(context.TODO(), lw.resourcePrefix, storageOpts, list); err != nil {
		return nil, err
	}
	return list, nil
}

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
	opts := storage.ListOptions{
		ResourceVersion: options.ResourceVersion,
		Predicate:       storage.Everything,
		Recursive:       true,
		ProgressNotify:  true,
	}
	return lw.storage.Watch(context.TODO(), lw.resourcePrefix, opts)
}

cacherListerWatcher — то, что извлекает данные, которые сохраняются в watchCache. Но нужен механизм, чтобы связать их вместе.

Reflector

watchCache и cacherListerWatcher создают перед Reflector, чтобы передать их ему.

watchCache := newWatchCache(
		config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource)
	listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
	reflectorName := "storage/cacher.go:" + config.ResourcePrefix

	reflector := cache.NewNamedReflector(reflectorName, listerWatcher, obj, watchCache, 0)

Reflector определен в пакете client-go/tools/cache со следующей документальной строкой.

// Reflector watches a specified resource and causes all changes to be reflected in the given store.

Большая часть функциональности вызывается из его метода ListAndWatch() Он выполняет то, что следует из его названия: перечисляет, а затем просматривает. События, поступающие от ListerWatcher (cacherListerWatcher), обрабатываются в watchHandler(). Именно здесь данные передаются в watchCache.

		case watch.Added:
				err := store.Add(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Modified:
				err := store.Update(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", name, event.Object, err))
				}
			case watch.Deleted:
				// TODO: Will any consumers need access to the "last known
				// state", which is passed in event.Object? If so, may need
				// to change this.
				err := store.Delete(event.Object)
				if err != nil {
					utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", name, event.Object, err))
				}

Связываем всё вместе

Когда все компоненты на месте, пора запустить Reflector, чтобы извлекать данные из cacherListerWatcher и сохранять их в watchCache, который будет информировать Cacher об обновлениях.

go cacher.dispatchEvents()

	cacher.stopWg.Add(1)
	go func() {
		defer cacher.stopWg.Done()
		defer cacher.terminateAllWatchers()
		wait.Until(
			func() {
				if !cacher.isStopped() {
					cacher.startCaching(stopCh)
				}
			}, time.Second, stopCh,
		)
	}()

Сначала рассмотрим второй вызов, cacher.startCaching(stopCh) — так мы запускаем Reflector с помощью вызова ListAndWatch().

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
	// The 'usable' lock is always 'RLock'able when it is safe to use the cache.
	// It is safe to use the cache after a successful list until a disconnection.
	// We start with usable (write) locked. The below OnReplace function will
	// unlock it after a successful list. The below defer will then re-lock
	// it when this function exits (always due to disconnection), only if
	// we actually got a successful list. This cycle will repeat as needed.
	successfulList := false
	c.watchCache.SetOnReplace(func() {
		successfulList = true
		c.ready.set(true)
		klog.V(1).Infof("cacher (%v): initialized", c.groupResource.String())
		metrics.WatchCacheInitializations.WithLabelValues(c.groupResource.String()).Inc()
	})
	defer func() {
		if successfulList {
			c.ready.set(false)
		}
	}()

	c.terminateAllWatchers()
	// Note that since onReplace may be not called due to errors, we explicitly
	// need to retry it on errors under lock.
	// Also note that startCaching is called in a loop, so there's no need
	// to have another loop here.
	if err := c.reflector.ListAndWatch(stopChannel); err != nil {
		klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err)
	}
}

Это приведет к тому, что Reflector начнет заполнять watchCache, который уведомит Cacher, чтобы он мог отправить его всем наблюдателям. Метод cacher.processEvent() был передан в watchCache как eventHandler(), который мы видели ранее.

func (c *Cacher) processEvent(event *watchCacheEvent) {
	if curLen := int64(len(c.incoming)); c.incomingHWM.Update(curLen) {
		// Monitor if this gets backed up, and how much.
		klog.V(1).Infof("cacher (%v): %v objects queued in incoming channel.", c.groupResource.String(), curLen)
	}
	c.incoming <- *event
}

Всё отправляется по каналу c.incoming, который затем считывается в методе cacher.dispatchEvents(), прежде чем быть отправленным каждому наблюдателю.

	case event, ok := <-c.incoming:
			if !ok {
				return
			}
			// Don't dispatch bookmarks coming from the storage layer.
			// They can be very frequent (even to the level of subseconds)
			// to allow efficient watch resumption on kube-apiserver restarts,
			// and propagating them down may overload the whole system.
			//
			// TODO: If at some point we decide the performance and scalability
			// footprint is acceptable, this is the place to hook them in.
			// However, we then need to check if this was called as a result
			// of a bookmark event or regular Add/Update/Delete operation by
			// checking if resourceVersion here has changed.
			if event.Type != watch.Bookmark {
				c.dispatchEvent(&event)
			}
			lastProcessedResourceVersion = event.ResourceVersion
			metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()

Когда мы вызываем c.dispatchEvent(&event), список cacheWatcher фильтруется по тем, кто заинтересован в данном событии, и событие отправляется каждому.

Метод Watch()

Это было непросто, но теперь, когда мы знаем, как обрабатываются события, пришло время вернуться к методу Watch(). Вместо запуска нового наблюдения etcd при вызове этого метода, создаем cacheWatcher, который добавляется к кандидатам, когда Cacher обрабатывает события. Каждый cacheWatcher имеет свой собственный цикл обработки: watcher.processInterval(). Метод Watch() запускает цикл обработки для наблюдателя, который он создает после добавления его в список наблюдателей для Cacher.

func() {
		c.Lock()
		defer c.Unlock()
		// Update watcher.forget function once we can compute it.
		watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, triggerValue, triggerSupported)
		c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)

		// Add it to the queue only when the client support watch bookmarks.
		if watcher.allowWatchBookmarks {
			c.bookmarkWatchers.addWatcher(watcher)
		}
		c.watcherIdx++
	}()

	go watcher.processInterval(ctx, cacheInterval, watchRV)
	return watcher, nil

cacheInterval создается ранее в методе и доставляет любые существующие события в watchCache, которые интересуют cacheWatcher. Метод watcher.processInterval() сначала обрабатывает все события из cacheInterval, затем начинает ожидать обработки будущих отправленных событий.

	for {
		event, err := cacheInterval.Next()
		if err != nil {
			klog.Warningf("couldn't retrieve watch event to serve: %#v", err)
			return
		}
		if event == nil {
			break
		}
		c.sendWatchCacheEvent(event)
		// With some events already sent, update resourceVersion so that
		// events that were buffered and not yet processed won't be delivered
		// to this watcher second time causing going back in time.
		resourceVersion = event.ResourceVersion
		initEventCount++
	}

Примечание: некоторые комментарии к коду удалены для краткости.

Обработка событий состоит из вызова функции sendWatchCacheEvent(), которая преобразует его. Затем доставляет по каналу result, который становится тем же каналом, из которого вызывающие функцию Watch() могут считывать данные!

func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
	watchEvent := c.convertToWatchEvent(event)
	if watchEvent == nil {
		// Watcher is not interested in that object.
		return
	}

	select {
	case <-c.done:
		return
	default:
	}

	select {
	case c.result <- *watchEvent:
	case <-c.done:
	}
}

Примечание: некоторые комментарии к коду удалены для краткости.

Все смотрят… Продуктивно

Давайте посмотрим на реализацию Cacher storage.Interface в действии. Мы берём нашу программу и модифицируем её, чтобы передать реализацию хранилища etcd Cacher.

cachewatch.go

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"

	"go.etcd.io/etcd/client/pkg/v3/transport"
	clientv3 "go.etcd.io/etcd/client/v3"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/fields"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
	"k8s.io/apiserver/pkg/storage"
	"k8s.io/apiserver/pkg/storage/cacher"
	"k8s.io/apiserver/pkg/storage/etcd3"
	"k8s.io/apiserver/pkg/storage/value/encrypt/identity"
	"k8s.io/kubernetes/pkg/apis/core"
	k8sv1 "k8s.io/kubernetes/pkg/apis/core/v1"
	"k8s.io/utils/clock"
)

func main() {
	tlsConfig, err := (transport.TLSInfo{
		CertFile:       "./pki/etcd/server.crt",
		KeyFile:        "./pki/etcd/server.key",
		TrustedCAFile:  "./pki/etcd/ca.crt",
		ClientCertFile: "./pki/apiserver-etcd-client.crt",
		ClientKeyFile:  "./pki/apiserver-etcd-client.key",
	}).ClientConfig()
	if err != nil {
		panic(err)
	}
	c, err := clientv3.New(clientv3.Config{
		Endpoints: []string{"https://127.0.0.1:2379"},
		TLS:       tlsConfig,
	})
	if err != nil {
		panic(err)
	}

	scheme := runtime.NewScheme()
	k8sv1.AddToScheme(scheme)
	core.AddToScheme(scheme)
	f := serializer.NewCodecFactory(scheme)
	s := etcd3.New(c, f.CodecForVersions(nil, f.UniversalDecoder(), nil, k8sv1.SchemeGroupVersion), nil, "registry", v1.Resource("ConfigMap"), identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig())

	ca, err := cacher.NewCacherFromConfig(cacher.Config{
		Storage:        s,
		Versioner:      storage.APIObjectVersioner{},
		GroupResource:  v1.Resource("configmaps"),
		ResourcePrefix: "configmaps",
		KeyFunc:        func(o runtime.Object) (string, error) { return storage.NamespaceKeyFunc("configmaps", o) },
		GetAttrsFunc: func(o runtime.Object) (label labels.Set, field fields.Set, err error) {
			return labels.Set{},
				fields.Set{}, nil
		},
		NewFunc: func() runtime.Object {
			return &v1.ConfigMap{}
		},
		NewListFunc: func() runtime.Object {
			return &v1.ConfigMapList{}
		},
		Codec: f.LegacyCodec(k8sv1.SchemeGroupVersion),
		Clock: clock.RealClock{},
	})
	if err != nil {
		panic(err)
	}
	for i := 0; i < 10000; i++ {
		w, err := ca.Watch(context.Background(), "configmaps", storage.ListOptions{
			Predicate: storage.Everything,
			Recursive: true,
		})
		if err != nil {
			panic(err)
		}
		go func() {
			for e := range w.ResultChan() {
				switch o := e.Object.(type) {
				case *v1.ConfigMap:
					fmt.Printf("%s || %s/%s\n", e.Type, o.Namespace, o.Name)
				default:
					co, ok := o.(runtime.CacheableObject)
					if !ok {
						panic("unknown event")
					}
					c, ok := co.GetObject().(*v1.ConfigMap)
					if !ok {
						panic("unknown object")
					}
					fmt.Printf("%s || %s/%s\n", e.Type, c.Namespace, c.Name)
				}
			}
		}()
	}

	stop := make(chan os.Signal)
	signal.Notify(stop, os.Interrupt)
	<-stop
}

Поскольку Prometheus все еще запущен и очищает наш экземпляр etcd в нашем кластере kind, мы запускаем нашу программу и видим влияние на процессор.

$ go run cachewatch.go
...
ADDED || kube-system/kube-proxy
ADDED || kube-public/cluster-info
ADDED || kube-system/coredns
ADDED || default/kube-root-ca.crt
ADDED || kube-public/kube-root-ca.crt
ADDED || kube-system/kubelet-config
ADDED || kube-system/kubeadm-config
ADDED || kube-system/kubeadm-config
ADDED || local-path-storage/kube-root-ca.crt
ADDED || local-path-storage/local-path-config
ADDED || default/kube-root-ca.crt
...

Хотя установлено 10 000 наблюдателей, в нашем экземпляре etcd нет заметного влияния на процессор. То же самое справедливо, если мы создадим новую ConfigMap.

$ kubectl create configmap testing
configmap/testing created
...
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
ADDED || default/testing2
...

Диапазон оси 0.00до 0.18.

Заключительные мысли

Это была вторая часть серии приключений сервера Kubernetes API. Основываясь на нашем storage.Interface, мы узнали, как Kubernetes оптимизирует работу watch перед лицом потенциально большого количества клиентов. Как и в большинстве случаев в разработке программного обеспечения, эта оптимизация не лишена компромиссов, но она достигает цели сделать общий случай более эффективным. В «Kubernetes: Мега» мы рассматриваем и затрагиваем такие области управления инфраструктурой, в которых каждое улучшение может помочь компании сэкономить сотни тысяч рублей.

https://slurm.club/3KhyGtd

Мы посмотрим, есть ли какие-либо случаи, когда этот компромисс вызывает проблемы, в будущих публикациях.

Комментарии (0)