Привет! Я Артём, скромный платформенный инженер: код пишу, метрики смотрю и иногда даже понимаю, что происходит. В работе мне часто приходится сталкиваться с Observability‑инструментами. Одним из таких инструментов, о котором я хотел бы рассказать - OpenTelemetry Collector. Это мощный инструмент, который позволяет работать с различной телеметрией и строить гибкие пайплайны для метрик, логов и трейсов.
Но иногда возможностей стандартного набора компонентов не хватает, чтобы справиться с поставленными задачами без использования костылей и изоленты. Тогда на сцену выходят кастомные компоненты для Otel-Collector.

Содержание
Введение
В стандартном наборе opentelemetry-collector-contrib
много готовых процессоров: фильтрация, нормализация, лимит памяти и так далее. Но у нас возникла задача: обогащать спаны данными из внешнего справочника (в нашем случае выбор пал на CSV‑файл).
Например, у нас есть спан с атрибутом SolObjectID
.
А в CSV лежит таблица, где этому SolObjectID
соответствует поле trace_code
, очень нужное нам, по каким-то причинам. Хочется, чтобы в трейсе появлялся этот trace_code
без дополнительного изменения кода в сервисах. К сожалению, готовых решений в contrib версии OpenTelemetry Collector не предусмотрено, однако, такой функционал очень даже хочется иметь. Взять и написать свой процессор - кажется хорошей идеей.
В этой статье я расскажу про свой опыт написания кастомного процессора для otel-collector. Он обогащает спаны данными из CSV‑файла: находит совпадение по атрибуту и добавляет дополнительные поля прямо внутрь трейсов. Мы разберём архитектуру процессора, посмотрим код и конфигурацию, а в конце покажу, как собрать и запустить Collector с этим расширением.
Архитектура кастомного процессора
Любой компонент в Collector (receiver, processor, exporter, connector) устроен по одному принципу: у него есть Config, Factory и сама логика (в нашем случае Processor).
1. Config (config.go)
Config описывает, какие параметры мы можем задать в YAML‑конфигурации Collector.
В OpenTelemetry Collector каждый компонент имеет свой конфиг. Collector ожидает, что этот конфиг будет реализовывать интерфейс component.Config
.
Collector не знает заранее, какие именно поля есть у нашего конфига. Но он знает, что любой конфиг - это объект, который реализует интерфейс component.Config
. Это позволяет Collector работать с любым компонентом одинаково.
Фрагмент Config кода:
type Config struct {
CSVPath string `mapstructure:"csv_path"`
MatchField string `mapstructure:"match_field"`
EnrichColumns []string `mapstructure:"enrich_columns"`
ReloadInterval time.Duration `mapstructure:"reload_interval"`
}
var _ component.Config = (*Config)(nil)
Что здесь важно:
CSVPath
- путь к CSV‑файлу.MatchField
- атрибут спана, по которому ищем совпадения.EnrichColumns
- список колонок, которые нужно добавить в спан.ReloadInterval
- как часто обновлять данные из CSV без перезапуска Collector.
Пример использования в collector-config.yaml
:
processors:
csvenricherprocessor:
csv_path: "/etc/mapping.csv"
match_field: "SolObjectID"
enrich_columns: [ "trace_code", "product_id" ]
reload_interval: "5m"
2. Factory (factory.go)
Factory - это «фабрика» для создания процессора. Collector работает по контракту: каждый компонент обязан предоставить фабрику, чтобы Collector мог его корректно зарегистрировать и собрать в пайплайн.
Фабрика отвечает за три ключевых момента:
Тип компонента - строковый идентификатор, по которому Collector узнаёт наш процессор.
var (
strType = component.MustNewType("csvenricherprocessor")
)
Конфигурация по умолчанию - значения, которые будут использоваться, если параметры не заданы в YAML.
func createDefaultConfig() component.Config {
return &Config{
MatchField: "SolObjectID", // дефолтное поле для поиска
EnrichColumns: []string{"trace_code", "another_code"}, // дефолтные колонки для обогащения
}
}
Создание самого процессора - функция, которая принимает контекст, конфигурацию и
nextConsumer
(следующий в цепочке компонент), а возвращает рабочий экземпляр процессора.
func createTracesProcessor(
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextConsumer consumer.Traces,
) (processor.Traces, error) {
processorCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("configuration parsing error")
}
// создаём наш процессор
proc, err := newProcessor(processorCfg, set.Logger)
if err != nil {
return nil, fmt.Errorf("cannot create csvenricher processor: %w", err)
}
// оборачиваем в хелпер
return processorhelper.NewTraces(
ctx,
set,
cfg,
nextConsumer,
proc.processTraces,
processorhelper.WithCapabilities(consumer.Capabilities{MutatesData: true}),
processorhelper.WithStart(proc.start),
processorhelper.WithShutdown(proc.shutdown),
)
}
Всё это объединяется в фабрике:
func NewFactory() processor.Factory {
return processor.NewFactory(
strType,
createDefaultConfig,
processor.WithTraces(createTracesProcessor, stability),
)
}
Таким образом, Factory:
регистрирует наш процессор в Collector (под уникальным именем csvenricherprocessor);
гарантирует, что у процессора всегда есть валидная конфигурация (даже если YAML пустой);
описывает, как создать рабочий экземпляр для обработки трейсов;
подключает lifecycle-хуки: start, shutdown, processTraces.
Без фабрики Collector просто не сможет «узнать» о существовании процессора и включить его в пайплайн.
3. Processor (processor.go)
Processor - это место, где реализуется бизнес-логика обработки данных.
В отличие от Factory (которая только регистрирует и создаёт компонент), Processor отвечает за всё: загрузку данных, обработку входящих спанов и graceful shutdown.
Разберём ключевые части.
Структура процессора
type csvEnricherProcessor struct {
logger *zap.Logger
config *Config
csvData []map[string]string // мапа с данными из таблицы
matchIndex map[string]int // индекс для поиска по значению `MatchField`
mu sync.RWMutex
ticker *time.Ticker
stopChan chan struct{}
}
Что тут есть:
logger
- для логирования (стандартная практика в Collector).config
- ссылка на Config, чтобы знать, где искать CSV, какое поле использовать и т.д.csvData
иmatchIndex
- подготовленные данные из CSV для быстрого поиска.mu
-sync.RWMutex
для потокобезопасного доступа к данным (Collector обрабатывает данные конкурентно).ticker
иstopChan
- управление циклом периодической перезагрузки CSV.
Жизненный цикл
Processor должен уметь запускаться и корректно завершаться:
start()
- поднимает бэкграунд-процесс для регулярной перезагрузки CSV.
func (p *csvEnricherProcessor) start(ctx context.Context, host component.Host) error {
p.logger.Info("Starting CSV Enricher Processor")
// Если интервал <= 0, просто загружаем один раз и выходим
if p.config.ReloadInterval <= 0 {
...
return nil
}
p.logger.Info("Starting background CSV reload loop",
zap.Duration("interval", p.config.ReloadInterval))
go func() {
ticker := time.NewTicker(p.config.ReloadInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.logger.Info("Reloading CSV data")
if err := p.loadCSVData(); err != nil {
p.logger.Warn("Failed to reload CSV data", zap.Error(err))
}
case <-ctx.Done():
p.logger.Info("CSV reload context done, stopping reload loop")
return
}
}
}()
return nil
}
shutdown()
- останавливает тикеры и чистит ресурсы.
func (p *csvEnricherProcessor) shutdown(ctx context.Context) error {
p.logger.Info("Shutting Down CSV Enricher Processor")
if p.ticker != nil {
p.ticker.Stop()
}
close(p.stopChan)
return nil
}
loadCSVData()
- загрузка данных из csv в мапу
func (p *csvEnricherProcessor) loadCSVData() error {
file, err := os.Open(p.config.CSVPath)
...
records, err := reader.ReadAll()
...
// headers + построение индекса по MatchField
}
Тут алгоритм такой:
Открываем файл.
Читаем все строки.
Берём заголовок (header).
Строим
map[matchValue]
→ индекс строки для быстрого поиска.
Обработка трейсов
Collector передаёт процессору батч трейсов через функцию processTraces
.
func (p *csvEnricherProcessor) processTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) {
p.mu.RLock()
defer p.mu.RUnlock()
resourceSpans := td.ResourceSpans()
for i := 0; i < resourceSpans.Len(); i++ {
resourceSpan := resourceSpans.At(i)
// enrichment на уровне resource
resourceAttrs := resourceSpan.Resource().Attributes()
p.enrichResource(resourceAttrs)
}
return td, nil
}
В нашем случае мы решили обогащать атрибуты ресурса (Resource.Attributes
), а не отдельные спаны.
Почему так:
ресурс привязан к сервису/экземпляру, и логично обогащать его.
меньше дублирования - обогащение на уровне Resource применяется ко всем спанам внутри.
Логика enrichment
func (p *csvEnricherProcessor) enrichResource(resourceAttrs pcommon.Map) {
matchValue, exists := resourceAttrs.Get(p.config.MatchField)
if !exists || matchValue.Type() != pcommon.ValueTypeStr {
return
}
...
recordIdx, found := p.matchIndex[matchValue.Str()]
...
for _, column := range p.config.EnrichColumns {
if value, ok := record[column]; ok {
resourceAttrs.PutStr(column, value)
}
}
}
Если в ресурсных атрибутах есть поле MatchField
, мы ищем его в CSV и добавляем все указанные колонки как новые атрибуты.
Итого, Processor:
управляет данными (CSV → память → быстрый поиск);
добавляет атрибуты к ресурсам;
живёт по жизненному циклу (start/shutdown);
потокобезопасен (
sync.RWMutex
);умеет обновлять данные «на лету» без рестарта Collector.
Как встроить процессор в Collector
Чтобы кастомный процессор оказался в финальном бинарнике Collector, нужно пересобрать его с помощью otelcol‑builder. Подробно про сборку бинарника можно прочитать в официальной документации
Мы же готовим Docker Image, поэтому опишу, как собрать кастомный образ.
Шаг 1. Настройка builder-config.yaml
В файле builder-config.yaml
в конец блока processors
добавить ссылку на репозиторий с кодом кастомного процессора, версия релиза обязательна.
processors:
- gomod: go.opentelemetry.io/collector/processor/batchprocessor v0.128.0
- gomod: go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.128.0
...
# наш процессор
- gomod: github.com/hiphopzeliboba/csvenricherprocessorr v0.3.0
При разработке процессора или его тестировании, можно использовать replaces
и подключить локальный пакет:
replaces:
- github.com/hiphopzeliboba/csvenricherprocessor => /path/to/local/csvenricherprocessor
В файле builder-config.yaml
указать корректный name
и output_path
.
output_path
должен соответствовать пути ENTRYPOINT ["/otelcol-contrib"]
в Dockerfile
dist:
module: github.com/open-telemetry/opentelemetry-collector-contrib/cmd/otelcontribcol
name: otelcol-contrib
decription: Local OpenTelemetry Collector Contrib binary, testing only.
version: 0.128.0-dev
output_path: ./otelcol-contrib
Шаг 2. Сборка Docker Image
Пример готового Doсkerfile можно найти тут
docker build -t opentelemetry-collector-contrib-custom:1.128.4 .
Шаг 3. Запускаем Collector (локально, для тестов)
Тут мы пробрасываем необходимый collector-config.yaml
и монтируем enrich.csv
внутрь контейнера
docker run -it --rm \
-v $(pwd)/enrich.csv:/data/enrich.csv \
-v $(pwd)/collector-config.yaml:/otelcol/collector-config.yaml \
-p 4317:4317 -p 4318:4318 -p 8888:8888 \
--name otelcol-custom opentelemetry-collector-contrib-custom:0.128.4
На этом этапе всё должно заработать :-)
Тестирование
Чтобы проверить работу процессора, я написал маленькую утилиту test_tracer
на Go. Она отправляет тестовые спаны с нужными атрибутами, чтобы убедиться, что обогащение работает. (Работает на стандартных портах Collector)
cd test_tracer/
go mod tidy
go run tracer.go
Итог
Мы реализовали кастомный процессор для OpenTelemetry Collector, который:
читает данные из CSV;
сопоставляет их по ключу с атрибутами спана;
добавляет новые поля в трассировки;
обновляет справочник без перезапуска Collector.
Благодаря модульной архитектуре Collector, сделать такой процессор оказалось не так сложно - главное понять три кита: Config, Factory и Processor.
Кастомные компоненты - это удобный способ расширить возможности OpenTelemetry Collector под свои бизнес‑нужды.
Если стандартных компонентов не хватает, можно писать свои: достаточно один раз разобраться в структуре, и дальше получится быстро собирать собственные «кирпичики» для Observability.Надеюсь, эта статья будет полезна и поможет вам сэкономить кучу времени, экспериментов и поисков по коду contrib‑репозитория :-).
Репозиторий c проектом
Полный код проекта доступен на GitHub:
hiphopzeliboba/otel-collector-contrib-custom-processor
olku
Спасибо за интересную статью. Не подскажете как проще всего в спан добавить версию/тег образа?
hiphopzeliboba Автор
Если используется k8s, можно посмотреть в сторону:
https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/processor/k8sattributesprocessor
А вообще, наверно, через встроенный процессор, что-то вроде этого: