В этой статье я поделюсь с вами своим опытом использования Golang, Kafka и Clickhouse на примере простого ETL-конвейера для параллельной передачи JSON-данных в базу данных с последующим прогнозированием температуры на основе машинного обучения

В продолжение двух моих предыдущих статей о Kafka и Clickhouse я решил написать небольшое приложение, демонстрирующее, как мы можем передавать данные в реальном времени через Kafka с проверкой и преобразованием в Clickhouse, чтобы затем сформировать на их основе временные ряды для машинного обучения — в частности, как мы можем использовать метеорологические данные прошлых лет для прогнозирования температуры. Признаюсь честно, я не очень хорошо разбираюсь в математике, стоящей за машинным обучением, и могу поделиться лишь наблюдениями, полученными методом проб и ошибок, но итоговая модель оказалась куда более точной, чем я мог ожидать.

Пара слов о задействованных технологиях:

Kafka — это распределенная система обмена сообщениями, поддерживающая множество производителей(producers)/потребителей(consumers). Брокер Kafka разбивает "топики", на которые собственно и можно подписываться/публиковать, на несколько разделов (партиций), которые реплицируются по всему кластеру. Хотя в ней нет встроенных ETL-инструментов, как в Apache Spark, Kafka обладает удивительно низкой задержкой и может передавать данные из нескольких источников. Для управления кластером я использую Zookeeper. Эта программа считывает данные из файла, так как сейчас это проще контролировать. В идеале же мы должны передавать в Kafka данные непосредственно из Национальной метеорологической службы и других источников.

Clickhouse — это колоночная реляционная база данных. В дополнение к SQL-подобным командам, она также включает в себя простой метод стохастической линейной регрессии, который мы и будем здесь использовать. Я специально не загадывал и, на самом деле, не ожидал, что метеорологические данные будут линейными, но я хотел, чтобы часть с машинным обучением была достаточно простой и выполнялась исключительно в Clickhouse.

Оба эти инструмента имеют открытый исходный код и бесплатны для использования. Я использую их образы для Docker: https://hub.docker.com/r/wurstmeister/kafka и https://hub.docker.com/r/clickhouse/clickhouse-server/. Я также использую API segmentio/kafka-go и clickhouse/clickhouse-go для Go.

Go я решил использовать из-за его простой модели параллелизма, на я хотел обратить ваше внимание в этом проекте. Каналы в Go обеспечивают синхронизацию между потоками (горутинами) и позволяют предотвратить состояния гонки без явного использования блокировок и мьютексов (которые также присутствуют в Go), сохраняя при этом гибкость и эффективность параллельного выполнения. Если вам интересно узнать об этом больше, то вот простой практический пример использования каналов.

Архитектура:

По сути, мы разделяем извлечение, преобразование и загрузку данных на несколько горутин, которые передают данные между каждым этапом по каналам. Полученные из Kafka данные в виде JSON-строк передаются по каналу в структуру (для проверки и распаковки), которая в свою очередь передается по другому каналу в Clickhouse. Структура здесь служит вместо схемы для проверки типа данных каждого поля.

Мы считываем данные из Kafka посредством групп потребителей (а не читателей из партиций), чтобы иметь большую гибкость в количестве потоков. Внутренний брокер Kafka позаботится о смещениях для каждой партиции, чтобы гарантировать, что мы прочитаем каждое сообщение только один раз, даже из параллельных потоков.

Затем данные преобразуются в Clickhouse, ведь мы должны подготовить их для временных рядов машинного обучения. Мы прогнозируем температуру для данного часа на основе температур предыдущих часов, поэтому мы организуем данные таким образом, чтобы их можно было легко ввести в метод машинного обучения Clickhouse, который определяет веса каждого параметра на основе соответствующих столбцов. Мы используем окно из последних 20 температур с их временными смещениями от начала окна в дополнение к месяцу, чтобы определить, какой будет следующая температура на любую заданную дату. Вдохновение для этого примера я почерпнул из статьи https://towardsdatascience.com/ml-approaches-for-time-series-4d44722e48fe.

Реализация:

set-up.sh (ETL)

#!/user/bin/bash

topic="temperature_data"
echo "kafka-topics.sh --delete  --topic ${topic} --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash
echo "kafka-topics.sh --create --topic ${topic} --partitions 10 --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash


table="temperatures"
# echo "show tables;" | docker exec -i clickhouse clickhouse-client

echo "truncate table if exists ${table};" |docker exec -i clickhouse clickhouse-client
ch_createtable="create table if not exists ${table} (
 Date_time DateTime('America/Los_Angeles'),
 Air_temp Float32,
 ) engine=MergeTree() ORDER BY Date_time;"

 # "create table if not exists ${table} (
 # Date_time DateTime('America/Los_Angeles'),
 # Sea_level_pressure Float32,
 # Altimeter Float32,
 # Air_temp Float32,
 # Relative_humidity Float32,
 # Dew_point_temperature Float32,
 # Wind_direction UInt16,
 # Wind_speed Float32,) engine=MergeTree() ORDER BY Date_time;"

echo ${ch_createtable} | docker exec -i clickhouse clickhouse-client

Создайте топик Kafka с таким количеством партиций, сколько вы планируете запускать горутин, читающих из Kafka. Затем создайте таблицу в Clickhouse для хранения данных температур, которые мы позже будем считывать из Kafka. Не обязательно указывать часовой пояс в данных, я сохранил его просто, чтобы напомнить себе, как это делается. В качестве первичного ключа для упорядочивания данных мы установим дату.

json-to-kafka.go (ETL)

Метеорологические данные, которые я загрузил, были в формате csv, поэтому я прогнал их через онлайн-конвертер и получил файл в формате json (https://csvjson.com/csv2json).

// https://www.weather.gov/lox/observations_historical
// https://csvjson.com/csv2json
// https://pkg.go.dev/github.com/segmentio/kafka-go#section-documentation
// https://pkg.go.dev/encoding/json#Marshal

package main

import (
   "github.com/segmentio/kafka-go"
   "context"
   "encoding/json"
   "os"
   "fmt"
   "time"
)

type Entry struct {
   Date_time               string
   Sea_level_pressure      float32
   Altimeter               float32
   Air_temp                float32
   Relative_humidity       float32
   Dew_point_temperature   float32
   Wind_direction          uint16
   Wind_speed              float32
}

func main() {
   const topic = "temperature_data"
   const fileName = "KLAX-data_temp-only.json"
   var read_data []byte
   var entries []Entry
   var limit int = 10000
   done_ch:=make(chan bool)
   var threads int= 10

   read_data = readData(fileName)
   json.Unmarshal(read_data, &entries) //формируем слайс со структурами из json

   fmt.Printf("How many entries to use? (/%d)", len(entries))
   fmt.Scan(&limit)
   entries = entries[:limit]

   for i:=0; i<threads; i++ {
       go writeMessages(entries, threads, i, topic, done_ch) //записываем в топик

   }

   for i:=0; i<threads; i++{ //ждем, пока все потоки не завершатся

       <-done_ch
   }
} //конец main

//читаем заданный файл и выводим []byte его содержимого

func readData(fileName string) ([]byte){
   data, err := os.ReadFile(fileName)
   if err != nil {
       panic(err)
   }
   return data
}
//продолжение ниже...

Как упоминалось ранее, мы можем использовать структуру для валидации типов данных каждой записи. Невалидные поля отбрасываются. Написание названий полей с заглавной буквы обязательно  — хоть в этой программе это ничего и не нарушает, в последнем своем коде я столкнулся с проблемами из-за того, что поля не были экспортированы должным образом.

В любом случае, мы можем использовать метод Unmarshal из пакета json для преобразования json-файла в слайс нашей структуры "Entry". После обрезки слайса до нужной длины, мы можем воспользоваться горутинами, чтобы равномерно распределить работу по записи в топик Kafka. Каждая горутина открывает Writer, который запишет все доставшиеся ей после распаковки структур записи обратно в json-строки.

//записываем свою часть слайса Entry в топик и подаем сигнал о завершении

func writeMessages(entries []Entry, threads int, thread_num int, topic string, done_ch chan bool){
   //смоделировано на основе документации
   writer := &kafka.Writer{
       Addr:   kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
       Topic:  topic,
       Balancer: &kafka.LeastBytes{}, //равномерно распределяем сообщения по партициям

       BatchTimeout: 10 * time.Millisecond, //по умолчанию 1 секунда

   }
   var batch_size int = 0
   msgs := []kafka.Message{}

   var count int = 0
   for i:=thread_num; i < len(entries); i+=threads {
       count++
       //преобразуем из структуры обратно в json
       entry_bytes, _ := json.Marshal(entries[i]) //формируем в json, как []byte

       batch_size++
       if batch_size <1000 { //отправка пакетов отдельными сообщениями является слишком медленной
           msgs = append(msgs, kafka.Message{ Value:   entry_bytes,})
       }else{
           batch_size = 0
           //https://stackoverflow.com/questions/37570665/what-does-mean-when-coming-directly-after-a-slice
           err := writer.WriteMessages(context.Background(), msgs...)
           if err!= nil {
               panic(err)
           }
           msgs = []kafka.Message{}
           msgs = append(msgs, kafka.Message{ Value:   entry_bytes,})
       }
   }
   err := writer.WriteMessages(context.Background(), msgs...) //запись оставшейся части последнего батча

   fmt.Printf("WROTE(%d): %v\n", count, thread_num)

   if err = writer.Close(); err != nil {
       panic(err)
   }
   done_ch <- true //сигнал о завершении записи
}

Следует отметить два ключевых момента:

  • Не стоит использовать слишком маленькие батчи.

  • По умолчанию батчи удаляются каждую секунду (опция BatchTimeout).

Если вы проигнорируете эти моменты, то запись будет очень медленной. Давайте сравним, что у нас получится для 10 потоков, которые должны записать по 1000 сообщений:

go run json-to-kafka.go 0.60s user 0.58s system 10% cpu 11.726 total

при тайм-ауте по умолчанию в 1 секунду и 100 сообщений в батче, и:

go run json-to-kafka.go 0.52s user 0.53s system 63% cpu 1.659 total

при тайм-ауте в 10 миллисекунд и 1000 сообщений на батч. А теперь представьте себе ожидание записи 100 000 сообщений с первым набором параметров (это около 15 минут :( ).

Мы сигнализируем main о завершении записи каждой горутиной с помощью каналов.

kafka-to-ch.go (ETL)

Это основная часть нашего ETL-конвейера.

Пока наши метеорологические данные записываются в Kafka, мы можем начать считывать их из Kafka для записи в базу данных. Здесь, чтобы не усложнять пример, я распарсил в структуру только поля с датой и температурой, поскольку в исходных метеорологических данных довольно много строк, не содержащих одного или нескольких игнорируемых полей, из-за чего программа отбрасывает эти записи при валидации.

// https://pkg.go.dev/github.com/ClickHouse/clickhouse-go/v2
// https://pkg.go.dev/os/signal

package main

import (
   "fmt"
   "github.com/ClickHouse/clickhouse-go/v2"
   "github.com/segmentio/kafka-go"
   "time"
   "context"
   "encoding/json"
   "errors"
   "os/signal"
   "os"
)

// json-to-kafka.go
type Entry struct {
   Date_time               string
   // Sea_level_pressure       float32
   // Altimeter                float32

   Air_temp                float32
   // Relative_humidity        float32
   // Dew_point_temperature    float32
   // Wind_direction           uint16
   // Wind_speed               float32

}

func main() {
   conn := connect_ch() //устанавливаем соединение с clickhouse

   const number_routines_kafka = 10
   const number_routines_ch = 10
   const number_routines_tr = 10
   const topic = "temperature_data"
   const table = "temperatures"
   var readers []*kafka.Reader = []*kafka.Reader{}

   signal_ch := make(chan os.Signal)
   msgs := make(chan []byte)
   entries := make(chan Entry)

   for i:=0; i<number_routines_kafka; i++ {
       reader := kafka.NewReader(kafka.ReaderConfig{
           Brokers:   []string{"localhost:9092", "localhost:9093", "localhost:9094"},
           GroupID:   "go-consumer-group",
           Topic:     topic,
           MaxBytes:  10e6, //на батч
           QueueCapacity: 1000,
           ReadBatchTimeout: 10 * time.Millisecond,
           // это не все доступные опции
           })
       readers = append(readers, reader)
   }

   go func (signal_ch chan os.Signal){
       signal.Notify(signal_ch, os.Interrupt) // ловим нажатие Ctrl-C

   } (signal_ch)



   go func (readers []*kafka.Reader, msgs chan []byte){
       for i:=0; i<number_routines_kafka; i++ {
           go readWithReader(readers[i], msgs, i) // чтение из Kafka
       }
   } (readers, msgs)

   for i:=0; i<number_routines_tr; i++ {
       go transform(msgs, i, entries) // валидация данных
   }

   for i:=0; i<number_routines_ch; i++ {
       go batchInsert(conn, entries, table, i) //запись в Clickhouse

   }

   <-signal_ch //висит до тех пор, пока соединения не будут готовы к очистке

   for i,reader := range readers{
       fmt.Printf("Closing reader: %d\n",i)
       if err := reader.Close(); err != nil {
           fmt.Println("failed to close reader:", err)
       }
   }
   if err := conn.Close(); err != nil {
       fmt.Println("failed to close clickhouse connection:", err)
   }

   fmt.Println("yay")
} //конец main

//Подключение к Clickhouse
func connect_ch() (clickhouse.Conn) {
   conn, err := clickhouse.Open(&clickhouse.Options{
       Addr: []string{fmt.Sprintf("%s:%d", "127.0.0.1", 19000)},
       Auth: clickhouse.Auth{
           Database: "default",
           Username: "default",
           Password: "",
       },
       Debug: false,
   })
   if err != nil {
       panic(err)
   }
   fmt.Println(conn.Ping(context.Background()))
   v, err := conn.ServerVersion()
   fmt.Println(v)
   if err != nil {
       panic(err)
   }
   return conn
}

//Получаем сообщения как группа потребителей и отправляем их по каналу []byte

func readWithReader(reader *kafka.Reader, ch chan []byte, routine_num int){
   var count int = 0
   for {
       entry, err := reader.ReadMessage(context.Background())
       if err != nil {
           break
       }
       ch <- entry.Value
       count++
       fmt.Printf("READ(%d): %v\n", count, routine_num)
   }
   fmt.Printf("DONE READ(%d): %v\n", count, routine_num)
}
//продолжение ниже...

Эта программа немного сложнее предыдущих. Вот краткий обзор того, что здесь происходит:

  • В горутинах readWithReader() используются несколько читателей Kafka одной и той же группы потребителей — они передают необработанные сообщения по каналу msgs в виде массива байтов. В то же время горутины transform() считывают из этого канала каждое сообщение, чтобы валидировать и преобразовать его в структуру Entry для передачи через канал entries в конечный набор горутин, batchInsert(), где они будут внесены в Clickhouse.

  • В программе также запущена горутина, которая ловит сигнал ctrl-c (SIGINT) и закрывает соединение читателей и Clickhouse для изящного завершения работы. После создания всех горутин, main остановится в ожидании, пока этот сигнал пройдет по каналу signal_ch, прежде чем приступить к очистке.

//(скопировано из main)
go func (signal_ch chan os.Signal){
       signal.Notify(signal_ch, os.Interrupt) // ловим нажатие клавиши Ctrl-C

   } (signal_ch)

//...

<-signal_ch // ожидает до тех пор, пока соединения не будут готовы к очистке

   for i,reader := range readers{
       fmt.Printf("Closing reader: %d\n",i)
       if err := reader.Close(); err != nil {
           fmt.Println("failed to close reader:", err)
       }
   }
// остальная часть очистки...

Следует отметить несколько моментов:

  • При запуске этой программы будет автоматически создана группа потребителей читателя, если она еще не была создана. Вы можете запустить

kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --all-groups

в shell запущенного контейнера kafka, чтобы увидеть смещения сообщений, потребляемых из топика.

  • Валидация на этапе преобразования заключается исключительно в проверке того, что нужные нам поля ненулевые (отсутствующие поля при преобразовании инициализируются умолчательными значениями). В зависимости от данных, некоторые записи могут быть отброшены ошибочно (например, если в исходных данных есть температура 0 градусов).

//Разбиваем сообщения из канала []byte на Entries и проверяем их
//перед отправкой их по каналу Entry
func transform(ch chan []byte, routine_num int, entries chan Entry){
   var count int = 0
   for val := range ch{
       var temp Entry
       err := json.Unmarshal(val, &temp) //в структуру
       if err != nil{
           //отмечаем, но двигаемся дальше
           fmt.Println("Error unmarshaling: " + string(val))
           continue
       }
       if validify(&temp) != nil{
           fmt.Printf("Error transforming data: %v\n", temp)
           continue
       }
       entries <- temp
       count++
       fmt.Printf("TRANSFORMED(%d): %v\n", count, routine_num)
   }
}

//Проверяем, что json-строка, преобразованная в структуру, содержит корректные данные (не нули)
func validify(entry *Entry) (error){
   //преобразование даты и времени в то, что clickhouse может преобразовать в DateTime
   res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
   if err != nil{
       return err
   }
   //если какая-либо запись равна 0 (отсутствующие данные), отбрасываем ее

   // if(entry.Sea_level_pressure==0||entry.Altimeter==0||
   //  entry.Air_temp==0||entry.Relative_humidity==0||entry.Dew_point_temperature==0||
   //  entry.Wind_direction==0||entry.Wind_speed==0){
       if(entry.Air_temp==0){
           return errors.New("invalid entry")
       }
   entry.Date_time = res.Format(time.DateTime)
   return nil
}
// продолжение ниже...
  • Для парсинга нестандартных форматов дат в строках нам пригодится Parse() из пакета time. Вот так будет выглядеть строка, представляющая 3:04 PM 2 января 2006 года в нужном для парсинга формате (например, "01/02/06-03:04PM") (https://pkg.go.dev/time#Parse).

//скопировано
res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
  • Функция AppendStruct() из API Clickhouse-go поможет нам с разбиением нашей структуры Entry на строки и добавлением их в батч (в качестве альтернативы вызову Append() и явной передачи каждого столбца).

// Вставка батчей в таблицу из канала Entry
func batchInsert(conn clickhouse.Conn, ch chan Entry, table string, routine_num int){
 var count int = 0
 for val := range ch{
       batch, _ := conn.PrepareBatch(context.Background(), "INSERT INTO " + table)
       batch.AppendStruct(&val) // ПОЛЯ СТРУКТУРЫ ДОЛЖНЫ НАЧИНАТЬСЯ С ВЕРХНЕГО РЕГИСТРА, иначе это не будет работать
       count++;
       batch.Send()
       fmt.Printf("INSERTED(%d): %d\n", count, routine_num)
 }
}
// конец программы

Машинное обучение в Clickhouse

predict-set-up.sh (ML)

Вдобавок к таблице, которую мы заполнили на предыдущих этапах, нам нужно сформировать из этих данных таблицу для обучения модели.

Вот как выглядит таблица с нашими данными (это лишь часть таблицы):

А вот часть того, что мы хотим (и будем) скармливать нашей модели (таблица model_data):

В массиве temp хранится окно с предшествующими температурами ( 30 штук) для каждой даты, затем температура конкретной даты, а в массиве time — смещение времени от начала окна для каждой из них. Например, дата 8/8/24 3:53 (первая строка в таблице) имеет в своем массиве temp температуры дат 8/7/24 2:53, 8/7/24 3:53, 8/7/24 4:53 и т. д. Поскольку начало этого диапазона приходится на 8/7/24 2:53, смещение по времени для каждой записи начинается с 0, затем растет на 60 минут (3600) с 2:53 -> 3:53 и т. д. Важно сохранять временные смещения, поскольку, как видно из исходных данных, температура не всегда измерялась через равные промежутки времени.

Чтобы создать эту таблицу, нам необходимо создать вспомогательную таблицу для поиска значений для каждого окна. Вот что нам нужно:

Min_bound представляет собой начало каждого окна. Такой формат позволит легко сгруппировать даты и температуры по каждому окну.

Нам на помощь приходят оконные функции.

-- (скопировано)


MIN(Date_time) OVER (
 ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW
) AS min_time,

Это позволит получить дату записи для window_size записей перед ней. Затем для каждой даты и ее min_time будут добавлены все даты и их температуры, которые попадают в диапазон (используя оператор WHERE). Хотя в результате получается несколько копий каждой даты, каждая из них принадлежит отдельному окну, обозначенному min_bound.

#!/user/bin/bash


window_size=30 #НЕ ИЗМЕНЯТЬ без обновления комманды модели
table="temperatures"


min_bound_table="CREATE TABLE IF NOT EXISTS helper
ENGINE = Memory
AS SELECT
   B.min_time AS min_bound,
   A.Date_time,
   A.Air_temp
FROM
(
   SELECT
       Date_time,
       Air_temp
   FROM ${table}
) AS A,
(
   SELECT
       min_time,
       Date_time,
       Air_temp,
       delta,
       a.Date_time
   FROM
   (
       SELECT
           Date_time,
           Air_temp,
           MIN(Date_time) OVER (ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW) AS min_time,
           Date_time - min_time AS delta
       FROM ${table}
   ) AS a
) AS B
WHERE (A.Date_time >= B.min_time) AND (A.Date_time <= B.Date_time)
ORDER BY Date_time ASC"


echo ${min_bound_table} | docker exec -i clickhouse clickhouse-client
#cont ...

После создания вспомогательной таблицы мы можем использовать GROUP BY min_bound, чтобы разделить записи на соответствующие окна, из которых мы затем можем транспонировать строки в один столбец с помощью groupArray(). В результате мы получим массив температур и временных смещений для каждой даты.

CREATE TABLE IF NOT EXISTS model_data
ENGINE = Memory
AS SELECT
   MAX(Date_time) AS time,
   groupArray(Air_temp) AS temp,
   groupArray(Date_time - min_bound) AS delta_t
FROM helper
GROUP BY min_bound
ORDER BY time ASC

Однако мы должны быть осторожны с самыми ранними 30 записями в нашей итоговой таблице: для заполнения их окон не хватило записей, поэтому вместо них была просто продублирована самая ранняя запись. Эти записи можно идентифицировать по нулевым временным смещениям для записей, выходящих за минимальную границу окна.

ALTER TABLE model_data (DELETE WHERE (delta_t[2]) = 0)

Теперь наши данные представлены в подходящем формате для подачи в нашу модель.

Функция стохастической линейной регрессии Clickhouse принимает размер шага, коэффициент регуляризации, размер батча и метод стохастического градиента, затем столбцы целевого значения, а затем каждый параметр. Любой метод, кроме "Adam", дает NaN, а регуляризация, похоже, не оказывает особого влияния. Я выбрал шаг 0.00004, коэффициент 0.15 и размер батча 3.

step=0.00004 # 0.00050
norm=0.15
batch=3 # 20
method="Adam"
train=75000 # количество записей для обучения
make_model="CREATE TABLE IF NOT EXISTS temp_model
ENGINE = Memory
AS SELECT stochasticLinearRegressionState(${step}, ${norm}, ${batch}, '${method}')
(temp[31], temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS state
FROM (SELECT temp, delta_t, time FROM model_data ORDER BY time ASC LIMIT ${train})"


echo ${make_model} | docker exec -i clickhouse clickhouse-client

Я также решил включить месяц в качестве параметра из-за вероятной корреляции между месяцем и температурой. В коде примера я обучаю модель только на первых 75 000 из 100 000 записей.

Теперь проверим модель на полном наборе данных. Последние записи не были частью обучающей выборки.

run_model="WITH (
       SELECT state
       FROM temp_model
   ) AS model
SELECT
time,
evalMLMethod(model, temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS predicted,
   temp[31] AS actual
FROM model_data ORDER BY time DESC LIMIT 200"

echo ${run_model} | docker exec -i clickhouse clickhouse-client

Прогнозируемые значения находятся в середине, а фактические — справа.

Довольно неплохо для этого интервала, хоть и не обошлось без 10-градусного отклонения для 8/7 18:53. Я сохранил все результаты в файл .csv, чтобы позже построить график с помощью следующей команды:

echo "sql command..." | docker exec -i clickhouse clickhouse-client --format CSV> result.csv

Результаты:

Выше приведены прогнозы моей лучшей модели. Ее максимальные и минимальные предсказания были очень точными, а средние отклонения между каждой предсказанной температурой и фактической были почти нулевыми.

Прежде чем показать его график в Excel, вот некоторые из моих предыдущих результатов для контекста:

step=0.00055, norm=0.1, batch=10

Первая попытка
Первая попытка

Вот это да, вот это отклонения. Если не обращать внимания на очевидные резкие отклонения, верхняя граница прогнозируемых значений выглядит почти как ровная линия.

step=0.00010, norm=0.15, batch=5, method=”Adam”
Вторая попытка
Вторая попытка

Этот вариант выглядит лучше. Отклонения менее экстремальны, а прогнозируемые значения следуют общей тенденции.

А теперь моя финальная модель:

step=0.000040, norm=0.15, batch=3, method=”Adam”
Последняя попытка
Последняя попытка

Отклонения уменьшились, а прогнозы более точно следуют синусоидальному тренду. Если разделить в Excel графики данных из последней попытки, то можно заметить, что пики действительно присутствуют в исходных данных, но модель их просто преувеличивает, как, например, при внезапном провале в июне 2016 года. Не обращайте внимания на значения оси y в этом случае:

На этих трех графиках я уменьшал как размер шага, так и размер батча. Большие размеры шага приводили к диким колебаниям результатов между последовательными прогонами и экстремальным выбросам. Увеличение размера батча, как правило, уменьшало величину этих выбросов, но при этом усредняло прогнозы, в результате чего получалась ровная линия, которая минимально менялась в течение года. (Интересно, что сглаживалась только верхняя граница; минимальные значения продолжали следовать правильной тенденции в более холодные месяцы года). Уменьшение степени "сплющивания" за счет уменьшения размера батча, а также уменьшение величины выбросов за счет уменьшения размера шага позволило получить наиболее точные результаты.

Добавление большего количества значимых факторов, таких как влажность или атмосферное давление, вероятно, повысит точность модели, но в любом случае основное внимание я уделял ETL-конвейеру, так что этого вполне достаточно.

Спасибо за ваше время! Надеюсь, вы узнали что-то полезное.

Всё о работе с ClickHouse: от установки и настройки, до продовых решений, можно узнать на онлайн-курсе «ClickHouse для инженеров и архитекторов БД» под руководством экспертов.

Комментарии (3)


  1. savostin
    03.10.2024 17:28
    +1

    Не очень понимаю зачем тут кафка. Чисто академический интерес?


    1. zartdinov
      03.10.2024 17:28

      Ну у него температура приходит один раз в час, поэтому не знаю зачем тут еще горутины, каналы, конвейеры, кластеры, зукиперы и тд.


  1. polRk
    03.10.2024 17:28

    Почему просто не взять ytsaurus и не сделать etl процесс на обычном stdin/stdout ?