AWS SQS играет значительную роль в современной архитектуре приложений, особенно в бессерверной среде. При работе с SQS часто можно увидеть, что сообщения не были прочитаны; причиной могут быть ошибка в вашем коде, временное ограничение ресурсов, превышение бюджета API или зависимости в сообщениях, которые должны быть обработаны. В большинстве случаев вы хотели бы знать, что это за сообщения, если они много раз терпят неудачу, а затем узнать, почему, и устранить проблемы. Именно здесь в игру вступает очередь недоставленных сообщений SQS.





Однако мониторинг недоставленного (сленг: «мертвого») сообщения может быть сложной задачей. Один из наиболее распространённых подходов — настройка CloudWatch для отправки сигналов тревоги, но люди часто сталкиваются с двумя проблемами:

  1. Нет деталей о недоставленных сообщениях. CloudWatch только показывает, что есть сообщения в очереди недоставленных сообщений, не сообщая деталей, чтобы найти более подробную информацию, DevOps часто приходится использовать другие инструменты, например AWS CLI.
  2. Нет возможности воспроизвести недоставленные сообщения, то есть система не в состоянии вернуть недоставленное сообщение в SQS, по крайней мере это не так легко. Можно использовать AWS CLI, чтобы вернуть их обратно, но опять же это делает устранение уже неприятных неполадок еще более неприятным.

Вышеперечисленные проблемы могут быть решены с помощью Slack и Lambda, как показано ниже.

Очередь недоставленных сообщений SQS является триггером событий лямбда-функции, которая отправляет уведомления Slack, затем Slack передаёт действия пользователя обратно лямбда-функции и, наконец, помещает сообщения обратно в SQS.

Часть 1. Slack


Сначала нам нужно создать приложение в Slack, а затем написать приложение, чтобы оно принимало действия пользователя и взаимодействовало с Lambda.


Перейдите на api.slack.com, чтобы создать приложение, если у вас его ещё нет.

Когда приложение будет готово, мы можем создать входящий веб-хук, на который лямбда-функция будет отправлять уведомления. Сохраните URL веб-хука, который понадобится позже.



Создаём интерактивность, URL запроса — это место, куда Slack посылает действия пользователя, конечная точка шлюза API функции Lambda.



Часть 2. Lambda


Я использую бессерверный фреймворк для управления лямбда-функциями. У нас будет две лямбда-функции:

  1. Функция мониторинга. Источник события функции — очередь недоставленных сообщений SQS, поэтому, когда в очереди недоставленных сообщений появляется сообщение, эта функция активируется, а затем переадресует сообщение в Slack.
  2. Функция команды. Эта функция отвечает за прослушивание действий Slack, то есть нажатие кнопки отправляет сообщение обратно в исходную SQS.

Файл Serverless.yml показывает, как настраиваются эти две функции
service: slack-sqs-monitor
frameworkVersion: "2.9.0"
provider:
  name: aws
  versionFunctions: false
  runtime: nodejs12.x
  region: ap-southeast-2
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        # You should only give least permissions to your functions.
        - "sqs:*"
      Resource:
        - arn:aws:sqs:ap-southeast-2:xxxxxxxx:sqs.fifo # The original SQS arn
        - arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo # The dead letter queue arn

plugins:
  - serverless-webpack
  - serverless-domain-manager

custom:
  customDomain:
    rest:
      domainName: labs.mianio.com
      basePath: sqs-command
      createRoute53Record: true
      securityPolicy: tls_1_2
  webpack:
    webpackConfig: "webpack.config.js"
    packager: "yarn"

functions:
  monitor:
    handler: functions/monitor.handler
    desciption: The function has the dead letter queue as the event, and forward the event to Slack
    tags:
      name: Monitor
    environment:
#     This is the webhook URL from the previous step
      SLACK_ENDPOINT: https://hooks.slack.com/services/XXXXXXX/XXXXXX/XXXXXXXXX
    events:
      - sqs:
#       Dead letter queue ARN
          arn: arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo

  command:
    handler: functions/command.handler
    tags:
      name: Command
      desciption: The function handles Slack action and place the message back to the queue
    environment:
    # Credentials should be retrieved from Parameter Store 
      SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}
      SLACK_OAUTH_TOKEN: ${ssm:/deadletter/slack/oauth-token~true}
    events:
      - http:
          path: slack
          method: post
          cors: true


  • В функция мониторинга есть SLACK_ENDPOINT в качестве переменной окружения, которая будет использоваться для публикации в Slack.
  • Функция command находится за шлюзом API, конечная точка — URL запроса для интерактивности Slack.

Функция декомпозирует события из очереди недоставленных сообщений и создаёт полезную нагрузку Slack для отправки. Смотрите api.slack.com/block-kit, чтобы узнать подробности о блоках для разработки в Slack.

Функция мониторинга
import middy from "@middy/core";
import axios from "axios";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
export const monitor = async (event: any): Promise<any> => {
  const records = event.Records;
  await Promise.all(
    records.map((record: any) => {
      const messageGroupId = record?.attributes?.MessageGroupId;
      const messageDeduplicationId = record?.attributes?.MessageDeduplicationId;
      const approximateReceiveCount =
        record?.attributes?.ApproximateReceiveCount;

      return axios({
        method: "post",
        url: process.env.SLACK_ENDPOINT,
        data: {
          blocks: [
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Messsge ID*: ${record.messageId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Message Group Id*: ${messageGroupId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Message Deduplication Id*: ${messageDeduplicationId}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: `*Approximate Receive Count*: ${approximateReceiveCount}`,
              },
            },
            {
              type: "section",
              text: {
                type: "mrkdwn",
                text: record.body,
              },
            },
            {
              type: "actions",
              elements: [
                {
                  type: "button",
                  style: "primary",
                  text: {
                    type: "plain_text",
                    text: "Send back",
                  },
                  action_id: "sendback",
                  value: record.body,
                },
              ],
            },
          ],
        },
        headers: {
          "Content-type": "application/json; charset=utf-8",
        },
      });
    })
  );
  return;
};

export const salesforceDeadLetterMonitor = middy(monitor).use(
  doNotWaitForEmptyEventLoop()
);


Каждый раз, когда сообщение поступает в очередь недоставленных сообщений, оно перенаправляется на канал Slack, где вы сможете не только понять, почему сообщение не отправляется, но и действовать.



Часть 3. Отправка обратно


Захватывающая деталь проекта — это возможность отправить недоставленное сообщение обратно в SQS для переработки. При нажатии зеленой кнопки Send back Slack запускает POST-запрос на определённый ранее URL-адрес действия, то есть конечную точку шлюза API.

Функция command, которая отправляет недоставленное сообщение обратно в SQS
import { APIGatewayEvent } from "aws-lambda";
import AWS from "aws-sdk";
import qs from "qs";
import axios from "axios";
import middy from "@middy/core";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
import httpHeaderNormalizer from "@middy/http-header-normalizer";
import httpEventNormalizer from "@middy/http-event-normalizer";
import httpErrorHandler from "@middy/http-error-handler";
import { slackVerifier } from "../../middlewares/slack/verify";

const sqs = new AWS.SQS({ region: "ap-southeast-2" });
const command = async (event: APIGatewayEvent) => {
  if (!event.body) return { statusCode: 200 };
  const requestBody: any = qs.parse(event.body);
  const payload: any = JSON.parse(requestBody.payload);
  let response;

  const action = payload.actions[0];
  if (action.action_id === "sendback") {
    try {
      const sqsPayload = payload.message.blocks.find(
        (block: any) => block.block_id === "payload"
      );
      if (sqsPayload?.text?.text && action?.value) {
        const payload = JSON.parse(sqsPayload.text.text);

        await putBack(payload.jobName, payload.jobData, action.value);

        response = {
          payload: {
            attachments: [
              {
                color: "good",
                text: "Job was sent back",
              },
            ],
            response_type: "in_channel",
          },
        };
      }
    } catch (error) {
      console.error(error);
    }
  }

  if (payload.response_url) {
    await axios({
      method: "post",
      url: payload.response_url,
      data: response.payload,
      headers: {
        "Content-type": "application/json; charset=utf-8",
        Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`,
      },
    });
  } else if (response && !payload.response_url && response.payload) {
    return {
      body: JSON.stringify(response.payload),
      statusCode: 200,
    };
  } else {
    return {
      statusCode: 200,
    };
  }
};

const putBack = async (name: string, data: any, workerUrl: string) => {
  const params: any = {
    MessageBody: JSON.stringify({ jobName: name, jobData: data }),
    QueueUrl: workerUrl,
  };
  return new Promise((resolve: Function, reject: Function): any => {
    sqs.sendMessage(params, (err: any, data: any): any => {
      if (err) {
        reject(err);
      } else {
        resolve(data);
      }
    });
  });
};

export const handler = middy(command)
  .use(doNotWaitForEmptyEventLoop())
  .use(httpEventNormalizer())
  .use(httpHeaderNormalizer())
  .use(slackVerifier())
  .use(httpErrorHandler());


Эта функция довольно проста:

  • Функция slackVerifier. Она проверяет, что POST-запрос направлен от Slack.

verifier.ts
import crypto from 'crypto';
import qs from 'qs';

export const slackVerifier = () => {
  return {
    before: async (handler: any) => {
      const slackSignature =
        handler.event.headers && handler.event.headers['x-slack-signature'];
      const timestamp =
        handler.event.headers &&
        handler.event.headers['x-slack-request-timestamp'];
      const time = Math.floor(new Date().getTime() / 1000);
      if (Math.abs(time - timestamp) > 300) {
        //  The request timestamp is more than five minutes from local time.
        // It could be a replay attack, so let's ignore it.

        return {
          statusCode: 401,
          body: JSON.stringify('Too old'),
        };
      }
      const body = handler.event.body;
      const sigBasestring = `v0:${timestamp}:${body}`;

      const hash = crypto
        .createHmac('sha256', process.env.SLACK_SIGNING_SECRET)
        .update(sigBasestring, 'utf8')
        .digest('hex');

      const mySignature = `v0=${hash}`;
      if (
        !crypto.timingSafeEqual(
          Buffer.from(mySignature, 'utf8'),
          Buffer.from(slackSignature, 'utf8')
        )
      ) {
        return {
          statusCode: 401,
          body: JSON.stringify('Invalid Signature'),
        };
      }
      return;
    },

    onError: (handler: any) => {
      return handler.callback(null, handler.error);
    },
  };
};
view raw

Переменная среды SLACKSIGNINGSECRET — это переменная со страницы конфигурации Slack, которая вводится из определений бессерверной среды Serverless.yml.



Я настоятельно рекомендую хранить учётные данные в безопасном месте; извлекать их во время развертывания и держать вне исходного кода— это хорошая практика.

SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}

  • Функция sendBack. Она получает полезную нагрузку от POST-запроса Slack и отправляет ее обратно в SQS.

Полезная нагрузка, которую Slack отправляет в command-функцию, выглядит так.



Она содержит response_url, который должен использоваться для отправки ответа обратно в Slack, чтобы подтвердить действие.



Код отправляет [полезную нагрузку ответа] обратно в Slack с помощью Bearer-токена.

await axios({
    method: "post",
    url: payload.response_url,
    data: response.payload,
    headers: {
       "Content-type": "application/json; charset=utf-8",
       Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`
    }
});

SLACK_OAUTH_TOKEN вводится из переменных среды во время развёртывания. Вы можете получить её значение на странице конфигурирования Slack:



Создавать такой проект интересно, и, что ещё важнее, он немного облегчает жизнь DevOps. Понемногу эти маленькие удобства создают комфортную среду для работы в целом.

Задумали с нового года начать новую жизнь и подучиться? До конца этого года ещё можно ухватить курс с хорошей скидкой. А если использовать промокод HABR — к скидке на баннере можно прибавлять еще 10%.

image