Привет! Я Артём, скромный платформенный инженер: код пишу, метрики смотрю и иногда даже понимаю, что происходит. В работе мне часто приходится сталкиваться с Observability‑инструментами. Одним из таких инструментов, о котором я хотел бы рассказать - OpenTelemetry Collector. Это мощный инструмент, который позволяет работать с различной телеметрией и строить гибкие пайплайны для метрик, логов и трейсов.

Но иногда возможностей стандартного набора компонентов не хватает, чтобы справиться с поставленными задачами без использования костылей и изоленты. Тогда на сцену выходят кастомные компоненты для Otel-Collector.

Содержание

Введение

В стандартном наборе opentelemetry-collector-contrib много готовых процессоров: фильтрация, нормализация, лимит памяти и так далее. Но у нас возникла задача: обогащать спаны данными из внешнего справочника (в нашем случае выбор пал на CSV‑файл).

Например, у нас есть спан с атрибутом SolObjectID.
А в CSV лежит таблица, где этому SolObjectID соответствует поле trace_code, очень нужное нам, по каким-то причинам. Хочется, чтобы в трейсе появлялся этот trace_code без дополнительного изменения кода в сервисах. К сожалению, готовых решений в contrib версии OpenTelemetry Collector не предусмотрено, однако, такой функционал очень даже хочется иметь. Взять и написать свой процессор - кажется хорошей идеей.

В этой статье я расскажу про свой опыт написания кастомного процессора для otel-collector. Он обогащает спаны данными из CSV‑файла: находит совпадение по атрибуту и добавляет дополнительные поля прямо внутрь трейсов. Мы разберём архитектуру процессора, посмотрим код и конфигурацию, а в конце покажу, как собрать и запустить Collector с этим расширением.

Архитектура кастомного процессора

Любой компонент в Collector (receiver, processor, exporter, connector) устроен по одному принципу: у него есть Config, Factory и сама логика (в нашем случае Processor).

1. Config (config.go)

Config описывает, какие параметры мы можем задать в YAML‑конфигурации Collector.
В OpenTelemetry Collector каждый компонент имеет свой конфиг. Collector ожидает, что этот конфиг будет реализовывать интерфейс component.Config.

Collector не знает заранее, какие именно поля есть у нашего конфига. Но он знает, что любой конфиг - это объект, который реализует интерфейс component.Config. Это позволяет Collector работать с любым компонентом одинаково.

Фрагмент Config кода:

type Config struct {
	CSVPath        string        `mapstructure:"csv_path"`
	MatchField     string        `mapstructure:"match_field"`
	EnrichColumns  []string      `mapstructure:"enrich_columns"`
	ReloadInterval time.Duration `mapstructure:"reload_interval"`
}

var _ component.Config = (*Config)(nil)

Что здесь важно:

  • CSVPath - путь к CSV‑файлу.

  • MatchField - атрибут спана, по которому ищем совпадения.

  • EnrichColumns - список колонок, которые нужно добавить в спан.

  • ReloadInterval - как часто обновлять данные из CSV без перезапуска Collector.

Пример использования в collector-config.yaml:

processors:
  csvenricherprocessor:
    csv_path: "/etc/mapping.csv"
    match_field: "SolObjectID"
    enrich_columns: [ "trace_code", "product_id" ]
    reload_interval: "5m"

2. Factory (factory.go)

Factory - это «фабрика» для создания процессора. Collector работает по контракту: каждый компонент обязан предоставить фабрику, чтобы Collector мог его корректно зарегистрировать и собрать в пайплайн.

Фабрика отвечает за три ключевых момента:

  1. Тип компонента - строковый идентификатор, по которому Collector узнаёт наш процессор.

var (  
    strType = component.MustNewType("csvenricherprocessor")  
)
  1. Конфигурация по умолчанию - значения, которые будут использоваться, если параметры не заданы в YAML.

func createDefaultConfig() component.Config {
	return &Config{
		MatchField:    "SolObjectID", // дефолтное поле для поиска
		EnrichColumns: []string{"trace_code", "another_code"}, // дефолтные колонки для обогащения
    }
}
  1. Создание самого процессора - функция, которая принимает контекст, конфигурацию и nextConsumer (следующий в цепочке компонент), а возвращает рабочий экземпляр процессора.

   func createTracesProcessor(
	   ctx context.Context,
	   set processor.Settings,
	   cfg component.Config,
	   nextConsumer consumer.Traces,
	) (processor.Traces, error) {
	
		processorCfg, ok := cfg.(*Config)
		if !ok {
	        return nil, fmt.Errorf("configuration parsing error")
	    }
	
	    // создаём наш процессор
	    proc, err := newProcessor(processorCfg, set.Logger)
	    if err != nil {
	        return nil, fmt.Errorf("cannot create csvenricher processor: %w", err)
	    }

	    // оборачиваем в хелпер
	    return processorhelper.NewTraces(
	        ctx,
	        set,
	        cfg,
	        nextConsumer,
	        proc.processTraces,
	        processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
	        processorhelper.WithStart(proc.start),
	        processorhelper.WithShutdown(proc.shutdown),
	    )
	}
  1. Всё это объединяется в фабрике:

   func NewFactory() processor.Factory {
	   return processor.NewFactory(
		   strType,
		   createDefaultConfig,
		   processor.WithTraces(createTracesProcessor, stability),
		)
   }

Таким образом, Factory:

  • регистрирует наш процессор в Collector (под уникальным именем csvenricherprocessor);

  • гарантирует, что у процессора всегда есть валидная конфигурация (даже если YAML пустой);

  • описывает, как создать рабочий экземпляр для обработки трейсов;

  • подключает lifecycle-хуки: start, shutdown, processTraces.

Без фабрики Collector просто не сможет «узнать» о существовании процессора и включить его в пайплайн.

3. Processor (processor.go)

Processor - это место, где реализуется бизнес-логика обработки данных.
В отличие от Factory (которая только регистрирует и создаёт компонент), Processor отвечает за всё: загрузку данных, обработку входящих спанов и graceful shutdown.

Разберём ключевые части.

Структура процессора

type csvEnricherProcessor struct {  
    logger     *zap.Logger  
    config     *Config  
    csvData    []map[string]string  // мапа с данными из таблицы
    matchIndex map[string]int // индекс для поиска по значению `MatchField` 
    mu         sync.RWMutex  
  
    ticker   *time.Ticker  
    stopChan chan struct{}  
}

Что тут есть:

  • logger - для логирования (стандартная практика в Collector).

  • config - ссылка на Config, чтобы знать, где искать CSV, какое поле использовать и т.д.

  • csvData и matchIndex - подготовленные данные из CSV для быстрого поиска.

  • mu - sync.RWMutex для потокобезопасного доступа к данным (Collector обрабатывает данные конкурентно).

  • ticker и stopChan - управление циклом периодической перезагрузки CSV.

Жизненный цикл

Processor должен уметь запускаться и корректно завершаться:

start() - поднимает бэкграунд-процесс для регулярной перезагрузки CSV.


func (p *csvEnricherProcessor) start(ctx context.Context, host component.Host) error {
	p.logger.Info("Starting CSV Enricher Processor")

	// Если интервал <= 0, просто загружаем один раз и выходим
	if p.config.ReloadInterval <= 0 {
		...
		return nil
	}

	p.logger.Info("Starting background CSV reload loop",
		zap.Duration("interval", p.config.ReloadInterval))
	go func() {
		ticker := time.NewTicker(p.config.ReloadInterval)
		defer ticker.Stop()
		for {
			select {
			case <-ticker.C:
				p.logger.Info("Reloading CSV data")
				if err := p.loadCSVData(); err != nil {
					p.logger.Warn("Failed to reload CSV data", zap.Error(err))
				}
			case <-ctx.Done():
				p.logger.Info("CSV reload context done, stopping reload loop")
				return
			}
		}
	}()

	return nil
}

shutdown() - останавливает тикеры и чистит ресурсы.

func (p *csvEnricherProcessor) shutdown(ctx context.Context) error {  
    p.logger.Info("Shutting Down CSV Enricher Processor")  
  
    if p.ticker != nil {  
       p.ticker.Stop()  
    }  
    close(p.stopChan)  
  
    return nil  
}

loadCSVData() - загрузка данных из csv в мапу

func (p *csvEnricherProcessor) loadCSVData() error {
	file, err := os.Open(p.config.CSVPath)
	...
	records, err := reader.ReadAll()
	...
	// headers + построение индекса по MatchField 
}

Тут алгоритм такой:

  • Открываем файл.

  • Читаем все строки.

  • Берём заголовок (header).

  • Строим map[matchValue] → индекс строки для быстрого поиска.

Обработка трейсов

Collector передаёт процессору батч трейсов через функцию processTraces.

func (p *csvEnricherProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {  
    p.mu.RLock()  
    defer p.mu.RUnlock()  
  
    resourceSpans := td.ResourceSpans()  
    for i := 0; i < resourceSpans.Len(); i++ {  
       resourceSpan := resourceSpans.At(i)  
  
       // enrichment на уровне resource  
       resourceAttrs := resourceSpan.Resource().Attributes()  
       p.enrichResource(resourceAttrs)  

    }  
    return td, nil  
}

В нашем случае мы решили обогащать атрибуты ресурса (Resource.Attributes), а не отдельные спаны.

Почему так:

  • ресурс привязан к сервису/экземпляру, и логично обогащать его.

  • меньше дублирования - обогащение на уровне Resource применяется ко всем спанам внутри.

Логика enrichment

func (p *csvEnricherProcessor) enrichResource(resourceAttrs pcommon.Map) {
	matchValue, exists := resourceAttrs.Get(p.config.MatchField)
	if !exists || matchValue.Type() != pcommon.ValueTypeStr {  
	    return  
	}
	...
	recordIdx, found := p.matchIndex[matchValue.Str()]
	...
	for _, column := range p.config.EnrichColumns {
		if value, ok := record[column]; ok {
			resourceAttrs.PutStr(column, value)
		}
	}
}

Если в ресурсных атрибутах есть поле MatchField, мы ищем его в CSV и добавляем все указанные колонки как новые атрибуты.

Итого, Processor:

  • управляет данными (CSV → память → быстрый поиск);

  • добавляет атрибуты к ресурсам;

  • живёт по жизненному циклу (start/shutdown);

  • потокобезопасен (sync.RWMutex);

  • умеет обновлять данные «на лету» без рестарта Collector.

Как встроить процессор в Collector

Чтобы кастомный процессор оказался в финальном бинарнике Collector, нужно пересобрать его с помощью otelcol‑builder. Подробно про сборку бинарника можно прочитать в официальной документации

Мы же готовим Docker Image, поэтому опишу, как собрать кастомный образ.

Шаг 1. Настройка builder-config.yaml

В файле builder-config.yaml в конец блока processors добавить ссылку на репозиторий с кодом кастомного процессора, версия релиза обязательна.

processors:
  - gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.128.0
  - gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.128.0
  ...
  # наш процессор
  - gomod: github.com/hiphopzeliboba/csvenricherprocessorr v0.3.0

При разработке процессора или его тестировании, можно использовать replaces и подключить локальный пакет:

   replaces:
     - github.com/hiphopzeliboba/csvenricherprocessor => /path/to/local/csvenricherprocessor

В файле builder-config.yaml указать корректный name и output_path.
output_path должен соответствовать пути ENTRYPOINT ["/otelcol-contrib"] в Dockerfile

   dist:
     module: github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontribcol
     name: otelcol-contrib
     decription: Local OpenTelemetry Collector Contrib binary, testing only.
     version: 0.128.0-dev
     output_path: ./otelcol-contrib

Шаг 2. Сборка Docker Image

Пример готового Doсkerfile можно найти тут

    docker build -t opentelemetry-collector-contrib-custom:1.128.4 .

Шаг 3. Запускаем Collector (локально, для тестов)

Тут мы пробрасываем необходимый collector-config.yaml и монтируем enrich.csv внутрь контейнера

  docker run -it --rm \
  -v $(pwd)/enrich.csv:/data/enrich.csv \
  -v $(pwd)/collector-config.yaml:/otelcol/collector-config.yaml \
  -p 4317:4317 -p 4318:4318 -p 8888:8888 \
  --name otelcol-custom opentelemetry-collector-contrib-custom:0.128.4

На этом этапе всё должно заработать :-)

Тестирование

Чтобы проверить работу процессора, я написал маленькую утилиту test_tracer на Go. Она отправляет тестовые спаны с нужными атрибутами, чтобы убедиться, что обогащение работает. (Работает на стандартных портах Collector)

cd test_tracer/
go mod tidy
go run tracer.go

Итог

Мы реализовали кастомный процессор для OpenTelemetry Collector, который:

  • читает данные из CSV;

  • сопоставляет их по ключу с атрибутами спана;

  • добавляет новые поля в трассировки;

  • обновляет справочник без перезапуска Collector.

Благодаря модульной архитектуре Collector, сделать такой процессор оказалось не так сложно - главное понять три кита: Config, Factory и Processor.

Кастомные компоненты - это удобный способ расширить возможности OpenTelemetry Collector под свои бизнес‑нужды.
Если стандартных компонентов не хватает, можно писать свои: достаточно один раз разобраться в структуре, и дальше получится быстро собирать собственные «кирпичики» для Observability.

Надеюсь, эта статья будет полезна и поможет вам сэкономить кучу времени, экспериментов и поисков по коду contrib‑репозитория :-).

Репозиторий c проектом

Полный код проекта доступен на GitHub:
hiphopzeliboba/otel-collector-contrib-custom-processor

Комментарии (0)


  1. olku
    18.09.2025 07:34

    Спасибо за интересную статью. Не подскажете как проще всего в спан добавить версию/тег образа?


    1. hiphopzeliboba Автор
      18.09.2025 07:34

      Если используется k8s, можно посмотреть в сторону:
      https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/k8sattributesprocessor

      А вообще, наверно, через встроенный процессор, что-то вроде этого:

      processors:
        resource:
          attributes:
            - key: otel.collector.version
              value: "v0.128.0-custom"
              action: insert