Пару слов обо мне
Меня зовут Дмитрий, я являюсь PHP разработчиком. Работаю с Битрикс24, Laravel и Go.
Проблема Битрикса
Как известно, рекомендуемое окружение для Битрикса – их собственная разработка BitrixVM на базе CentOS. Иногда такое окружение не устраивает заказчиков, поэтому выбирают Docker или сервер с установленным LEMP стеком.
При переходе на окружение отличное от BitrixVM, существует две основные проблемы – отсутствие сервера очередей Push&Pull и сервера конвертации файлов.
Первая проблема решаема: на просторах гитхаба уже существует рабочее решение для развертывания локального сервера в Docker. А также можно использовать облачный сервер, так как для его работы не требуется, чтобы портал был доступен извне.
Со второй проблемой облако уже не всегда возможно использовать по ряду причин:
- необходимость доступности сайта извне (не подходит для полностью закрытых окружений); 
- опасения заказчиков по поводу передачи конфиденциальных документов на облачный сервер. 
Единственное решение – установка BitrixVM на отдельном сервере/в докере, с развертыванием бэкапа внутри и использование штатного сервера, встроенного в окружение, что далеко не всегда удобно.
С этим сталкивался и я на закрытых окружениях крупных российских заказчиков, которые далеко не всегда соглашались открывать доступ к порталу из-за соображений безопасности.
После нескольких таких кейсов было решено написать свой полноценный аналог, который бы отвечал нескольким критериям:
- Работал в докере и запускался в несколько кликов; 
- Была возможность легко задать количество воркеров под различную нагрузку. 
В качестве языка программирования было решено выбрать Go.
Реализация
Для начала нам необходимо понять, что представляет собой штатный сервер конвертации: по сути, это не более чем:
- API сервер с одним эндпоинтом; 
- RabbitMQ сервер; 
- Consumer для обработки заданий из очереди, со следующим установленным ПО: LibreOffice, MPEG, ImageMagic. 
Посмотреть реализацию на PHP можно в исходниках модуля Битрикса. Для упрощения я покажу только основной код, остальное всегда можно будет посмотреть в репозитории.
API сервер
Для API мы будем использовать библиотеку go-chi/chi.
Реализуем единственный обработчик – convert, с преобразованием данных в структуру ConvertTask и последующим добавлением в одну из очередей:
- main_preview – используется в предпросмотре файлов; 
- documentgenerator_create – используется в генераторе документов CRM. 
// internal/http-server/handlers/convert/convert.go
func New(ctx context.Context, log *slog.Logger, rabbit *rabbitmq.Rabbit) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		const op = "handlers.convert.New"
		reqId := middleware.GetReqID(r.Context())
		log = log.With(
			slog.String("op", op),
			slog.String("request_id", reqId),
		)
		err := r.ParseForm()
		if err != nil {
			log.Error("failed to decode request body", sl.Err(err))
			render.JSON(w, r, resp.Error("failed to decode request body", 152))
		}
		task, err := prepareOptions(r.Form, reqId)
		if err != nil {
			log.Error("failed to prepare options", sl.Err(err))
			render.JSON(w, r, resp.Error("failed to parse task", 0))
			return
		}
		if task.Queue == "" {
			task.Queue = rabbit.DefaultQueue()
			log.Warn("not found queue. Set default", slog.String("default_queue", task.Queue))
		}
		taskMsg, err := json.Marshal(task)
		if err != nil {
			log.Error("Error parse request", sl.Err(err))
			render.JSON(w, r, resp.Error("Error parse request", 0))
			return
		}
		err = rabbit.Publish(task.Queue, taskMsg)
		if err != nil {
			log.Error("error publish task", slog.String("queue", task.Queue), sl.Err(err))
			render.JSON(w, r, resp.Error("error publish task", 0))
		}
		render.JSON(w, r, resp.Success())
	}
}FileUploader
Создадим структуру FileUploader и реализуем следующие базовые методы:
- Download – скачивание конвертируемого файла из Б24; 
- uploadFile – загрузка готового файла в Б24; 
- Complete – отправка запроса о завершении конвертации; 
- 
getUploadInfo – получение информации о загружаемом файле. Для каждого запроса заводим структуры. Оборачиваем запросы в функцию retry.Do из библиотеки avast/retry-go. Устанавливаем 3 попытки на подключение (на случай сетевых проблем). 
// internal/lib/fileuploader/fileuploader.go
Пример метода получения информации о файле
func (f *FileUploader) getUploadInfo(file string, key string) (*uploadInfoResp, error) {
	fileInfo, err := os.Stat(file)
	if err != nil {
		return nil, fmt.Errorf("error get file info [%s]: [%w]", file, err)
	}
	uploadReq := uploadInfoRequest{
		FileId:   key,
		FileSize: fileInfo.Size(),
		Upload:   "where",
	}
	v, err := query.Values(uploadReq)
	if err != nil {
		return nil, fmt.Errorf("error convert struct request to query: [%w]", err)
	}
	res, err := http.PostForm(f.url, v)
	if err != nil {
		return nil, fmt.Errorf("error get upload info from [%s]: [%w]", f.url, err)
	}
	var uploadInfoRes uploadInfoResp
	body, err := io.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("wrong response upload info request to url [%s]: [%w]", f.url, err)
	}
	if err = json.Unmarshal(body, &uploadInfoRes); err != nil {
		return nil, fmt.Errorf("error unmarshal upload info request to url [%s]: [%w]", f.url, err)
	}
	return &uploadInfoRes, nil
}
Клиент RabbitMQ
Создаем структуру RabbitMQ и реализуем базовые методы:
- Connect - подключение к rabbitMQ; 
- Reconnect - реконнект в случае разрыва соединения, запускаем в отдельной горутине; 
- InitQueue - инициализация основной очереди и Dead Letter; 
- Consume - чтение сообщений из очереди; 
- Publish - отправка сообщения в очередь. 
// internal/lib/rabbitmq/rabbitmq.go
package rabbitmq
import (
	"bitrix-converter/internal/config"
	"bitrix-converter/internal/lib/logger/sl"
	"context"
	"fmt"
	amqp "github.com/rabbitmq/amqp091-go"
	"log/slog"
	"time"
)
type Rabbit struct {
	conn *amqp.Connection
	log  *slog.Logger
	cfg  config.RabbitConfig
}
func New(log *slog.Logger, cfg config.RabbitConfig) *Rabbit {
	return &Rabbit{
		log: log,
		cfg: cfg,
	}
}
func (r *Rabbit) DefaultQueue() string {
	return r.cfg.DefaultQueue
}
func (r *Rabbit) Connect() error {
	url := fmt.Sprintf("amqp://%s:%s@%s:%s", r.cfg.User, r.cfg.Password, r.cfg.Host, r.cfg.Port)
	conn, err := amqp.Dial(url)
	if err != nil {
		return fmt.Errorf("failed to connect to RabbitMQ: [%w]", err)
	}
	r.conn = conn
	return nil
}
func (r *Rabbit) Channel() (*amqp.Channel, error) {
	return r.conn.Channel()
}
func (r *Rabbit) Reconnect() {
	for {
		_, ok := <-r.conn.NotifyClose(make(chan *amqp.Error))
		if !ok {
			r.log.Error("failed notifying rabbitMQ channel. Reconnecting...")
		}
		r.log.Error("rabbitmq connection closed unexpectedly. Reconnecting...")
		for {
			err := r.Connect()
			if err == nil {
				r.log.Info("rabbitMQ reconnect success")
				break
			}
			r.log.Error("rabbitmq reconnect failed. Retry after 10 seconds", sl.Err(err))
			time.Sleep(10 * time.Second)
		}
	}
}
func (r *Rabbit) InitQueue(ch *amqp.Channel, queue string) error {
	dlQueue := queue + "_dead"
	_, err := ch.QueueDeclare(
		dlQueue,
		true,
		false,
		false,
		false,
		amqp.Table{},
	)
	if err != nil {
		return fmt.Errorf("failed to declare dead letter queue: [%w]", err)
	}
	_, err = ch.QueueDeclare(
		queue,
		true,
		false,
		false,
		false,
		amqp.Table{
			"x-dead-letter-exchange":    "",
			"x-dead-letter-routing-key": dlQueue,
			"x-message-ttl":             60480000,
		},
	)
	if err != nil {
		return fmt.Errorf("failed to declare queue: [%w]", err)
	}
	return nil
}
func (r *Rabbit) Consume(ch *amqp.Channel, queue string) (msgs <-chan amqp.Delivery, err error) {
	msgs, err = ch.Consume(
		queue,
		"",
		false,
		false,
		false,
		false,
		nil,
	)
	if err != nil {
		return nil, fmt.Errorf("failed to consume queue: [%w]", err)
	}
	return msgs, nil
}
func (r *Rabbit) Publish(queue string, message []byte) error {
	ch, err := r.Channel()
	if err != nil {
		return fmt.Errorf("failed to open channel: [%w]", err)
	}
	defer ch.Close()
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	err = ch.PublishWithContext(
		ctx,
		"",
		queue,
		false,
		false,
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        message,
		},
	)
	if err != nil {
		return fmt.Errorf("failed to publish message: [%w]", err)
	}
	return nil
}
func (r *Rabbit) Connection() *amqp.Connection {
	return r.conn
}
Command
Создадим BaseCommand с общим методом Execute и две реализации: DocumentCommand (LibreOffice) и VideoCommand (MPEG).
// internal/lib/command/command.go
func (bs *BaseCommand) Execute() error {
	if err := bs.validate(); err != nil {
		return fmt.Errorf("failed validate transform task: [%w]", err)
	}
	directory := bs.DownloadDir()
	err := os.MkdirAll(directory, 0755)
	if err != nil {
		return fmt.Errorf("error creating directory [%s]: [%w]", directory, err)
	}
	filePath := bs.genTmpFilePath(directory)
	err = retry.Do(
		func() error {
			return bs.uploader.Download(bs.task.File, filePath, bs.MaxSize())
		},
		retry.Attempts(3),
		retry.OnRetry(func(n uint, err error) {
			time.Sleep(1 * time.Second)
		}),
	)
	bs.uploader.AddFileToDelete(filePath)
	defer bs.uploader.DeleteFiles()
	if err != nil {
		return fmt.Errorf("error download file [%s]: [%w]", bs.task.File, err)
	}
	bs.file = filePath
	for _, format := range bs.task.Formats {
		if _, ok := bs.files[format]; ok {
			continue
		}
		pre, err := bs.preConvert(format, filePath)
		if err != nil {
			return err
		}
		if pre {
			continue
		}
		convertedFile, err := bs.transform(format, filePath)
		bs.uploader.AddFileToDelete(convertedFile)
		if err != nil {
			return fmt.Errorf("error transform file [%s] to [%s]: [%w]", bs.task.File, format, err)
		}
		bs.files[format] = convertedFile
	}
	bs.uploader.SetFiles(bs.files)
	err = bs.uploader.UploadFiles()
	if err != nil {
		return fmt.Errorf("error uploading files: [%w]", err)
	}
	err = bs.uploader.Complete()
	if err != nil {
		return fmt.Errorf("failed complete: [%w]", err)
	}
	return nil
}Consumer
В текущей реализации запускаем 3 горутины на каждую очередь, в одном контейнере.
Настроим graceful завершение (с таймаутом в 5 минут) и реконнект к rabbitmq (в случае разрыва соединения обработчики будут подключаться заново и после восстановления соединения продолжат работу в штатном режиме)
// cmd/consumer/main.go
func main() {
	cfg := config.MustLoad()
	logger := sl.SetupLogger(cfg.Env)
	rabbit := rabbitmq.New(logger, cfg.Rabbit)
	conErr := rabbit.Connect()
	if conErr != nil {
		log.Fatalf("failed connect to RabbitMQ with start %v", conErr)
	}
	go rabbit.Reconnect()
	cancelCtx, cancel := context.WithCancel(context.Background())
	wg := &sync.WaitGroup{}
	for i := 0; i <= 3; i++ {
		for _, queue := range queues {
			go func() {
				uniqId := fmt.Sprintf("%s_%d", queue, i)
				logger.Info("start consumer", slog.String("queue", queue))
				wg.Add(1)
			done:
				for {
					time.Sleep(10 * time.Second)
					ch, err := rabbit.Channel()
					if err != nil {
						logger.Error("failed to open channel. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}
					defer ch.Close()
					err = rabbit.InitQueue(ch, queue)
					if err != nil {
						logger.Error("failed init queue. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}
					logger.Info("success init queue", slog.String("queue", queue))
					msgs, err := rabbit.Consume(ch, queue)
					if err != nil {
						logger.Error("failed consume. Retry", slog.String("queue", queue), sl.Err(err))
						continue
					}
					logger.Info("success consume. Waiting messages", slog.String("queue", queue))
				closed:
					for {
						select {
						case <-cancelCtx.Done():
							break done
						default:
						}
						select {
						case d, ok := <-msgs:
							if !ok {
								logger.Info("channel closed", slog.String("queue", queue))
								break closed
							}
							handleMessage(d, logger, cfg, uniqId)
						default:
						}
                        time.Sleep(1 * time.Second)
					}
				}
				wg.Done()
			}()
		}
	}
	waitCh := make(chan struct{})
	ch := make(chan os.Signal)
	signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGKILL)
	<-ch
	logger.Info("receive a shutdown signal")
	go func() {
		logger.Info("cancel, wait consumer")
		cancel()
		wg.Wait()
		close(waitCh)
	}()
	select {
	case <-waitCh:
		logger.Info("graceful shutdown")
	case <-time.After(5 * time.Minute):
		logger.Info("shutdown before 5 minutes timeout")
	}
}
Логирование
Для логирования используем библиотеку slog, для группировки логов используем параметр request_id.
Dead Letter Queue
Для каждой очереди инициализируем одноимённую очередь с префиксом dead.
В случае ошибки конвертации делаем reject, и задание попадает в dead очередь, для последующего анализа. По умолчанию время жизни сообщения - неделя.
Где посмотреть?
Сервер разместил на GitHub, текущая версия 1.0.0. В дальнейшем возможно будет обновляться. Инструкция по развертыванию прилагается.
Итого
Надеюсь, это решение упростит отказ от CentOS там, где это необходимо. При возникновении проблем создавайте issue на GitHub.
 
          