AWS SQS играет значительную роль в современной архитектуре приложений, особенно в бессерверной среде. При работе с SQS часто можно увидеть, что сообщения не были прочитаны; причиной могут быть ошибка в вашем коде, временное ограничение ресурсов, превышение бюджета API или зависимости в сообщениях, которые должны быть обработаны. В большинстве случаев вы хотели бы знать, что это за сообщения, если они много раз терпят неудачу, а затем узнать, почему, и устранить проблемы. Именно здесь в игру вступает очередь недоставленных сообщений SQS.
Однако мониторинг недоставленного (сленг: «мертвого») сообщения может быть сложной задачей. Один из наиболее распространённых подходов — настройка CloudWatch для отправки сигналов тревоги, но люди часто сталкиваются с двумя проблемами:
Вышеперечисленные проблемы могут быть решены с помощью Slack и Lambda, как показано ниже.
Очередь недоставленных сообщений SQS является триггером событий лямбда-функции, которая отправляет уведомления Slack, затем Slack передаёт действия пользователя обратно лямбда-функции и, наконец, помещает сообщения обратно в SQS.
Сначала нам нужно создать приложение в Slack, а затем написать приложение, чтобы оно принимало действия пользователя и взаимодействовало с Lambda.
Перейдите на api.slack.com, чтобы создать приложение, если у вас его ещё нет.
Когда приложение будет готово, мы можем создать входящий веб-хук, на который лямбда-функция будет отправлять уведомления. Сохраните URL веб-хука, который понадобится позже.
Создаём интерактивность, URL запроса — это место, куда Slack посылает действия пользователя, конечная точка шлюза API функции Lambda.
Я использую бессерверный фреймворк для управления лямбда-функциями. У нас будет две лямбда-функции:
Функция декомпозирует события из очереди недоставленных сообщений и создаёт полезную нагрузку Slack для отправки. Смотрите api.slack.com/block-kit, чтобы узнать подробности о блоках для разработки в Slack.
Каждый раз, когда сообщение поступает в очередь недоставленных сообщений, оно перенаправляется на канал Slack, где вы сможете не только понять, почему сообщение не отправляется, но и действовать.
Захватывающая деталь проекта — это возможность отправить недоставленное сообщение обратно в SQS для переработки. При нажатии зеленой кнопки Send back Slack запускает POST-запрос на определённый ранее URL-адрес действия, то есть конечную точку шлюза API.
Эта функция довольно проста:
Переменная среды SLACKSIGNINGSECRET — это переменная со страницы конфигурации Slack, которая вводится из определений бессерверной среды Serverless.yml.
Я настоятельно рекомендую хранить учётные данные в безопасном месте; извлекать их во время развертывания и держать вне исходного кода— это хорошая практика.
Полезная нагрузка, которую Slack отправляет в command-функцию, выглядит так.
Она содержит response_url, который должен использоваться для отправки ответа обратно в Slack, чтобы подтвердить действие.
Код отправляет [полезную нагрузку ответа] обратно в Slack с помощью Bearer-токена.
SLACK_OAUTH_TOKEN вводится из переменных среды во время развёртывания. Вы можете получить её значение на странице конфигурирования Slack:
Создавать такой проект интересно, и, что ещё важнее, он немного облегчает жизнь DevOps. Понемногу эти маленькие удобства создают комфортную среду для работы в целом.
Задумали с нового года начать новую жизнь и подучиться? До конца этого года ещё можно ухватить курс с хорошей скидкой. А если использовать промокод HABR — к скидке на баннере можно прибавлять еще 10%.
Однако мониторинг недоставленного (сленг: «мертвого») сообщения может быть сложной задачей. Один из наиболее распространённых подходов — настройка CloudWatch для отправки сигналов тревоги, но люди часто сталкиваются с двумя проблемами:
- Нет деталей о недоставленных сообщениях. CloudWatch только показывает, что есть сообщения в очереди недоставленных сообщений, не сообщая деталей, чтобы найти более подробную информацию, DevOps часто приходится использовать другие инструменты, например AWS CLI.
- Нет возможности воспроизвести недоставленные сообщения, то есть система не в состоянии вернуть недоставленное сообщение в SQS, по крайней мере это не так легко. Можно использовать AWS CLI, чтобы вернуть их обратно, но опять же это делает устранение уже неприятных неполадок еще более неприятным.
Вышеперечисленные проблемы могут быть решены с помощью Slack и Lambda, как показано ниже.
Очередь недоставленных сообщений SQS является триггером событий лямбда-функции, которая отправляет уведомления Slack, затем Slack передаёт действия пользователя обратно лямбда-функции и, наконец, помещает сообщения обратно в SQS.
Часть 1. Slack
Сначала нам нужно создать приложение в Slack, а затем написать приложение, чтобы оно принимало действия пользователя и взаимодействовало с Lambda.
Перейдите на api.slack.com, чтобы создать приложение, если у вас его ещё нет.
Когда приложение будет готово, мы можем создать входящий веб-хук, на который лямбда-функция будет отправлять уведомления. Сохраните URL веб-хука, который понадобится позже.
Создаём интерактивность, URL запроса — это место, куда Slack посылает действия пользователя, конечная точка шлюза API функции Lambda.
Часть 2. Lambda
Я использую бессерверный фреймворк для управления лямбда-функциями. У нас будет две лямбда-функции:
- Функция мониторинга. Источник события функции — очередь недоставленных сообщений SQS, поэтому, когда в очереди недоставленных сообщений появляется сообщение, эта функция активируется, а затем переадресует сообщение в Slack.
- Функция команды. Эта функция отвечает за прослушивание действий Slack, то есть нажатие кнопки отправляет сообщение обратно в исходную SQS.
Файл Serverless.yml показывает, как настраиваются эти две функции
service: slack-sqs-monitor
frameworkVersion: "2.9.0"
provider:
name: aws
versionFunctions: false
runtime: nodejs12.x
region: ap-southeast-2
iamRoleStatements:
- Effect: "Allow"
Action:
# You should only give least permissions to your functions.
- "sqs:*"
Resource:
- arn:aws:sqs:ap-southeast-2:xxxxxxxx:sqs.fifo # The original SQS arn
- arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo # The dead letter queue arn
plugins:
- serverless-webpack
- serverless-domain-manager
custom:
customDomain:
rest:
domainName: labs.mianio.com
basePath: sqs-command
createRoute53Record: true
securityPolicy: tls_1_2
webpack:
webpackConfig: "webpack.config.js"
packager: "yarn"
functions:
monitor:
handler: functions/monitor.handler
desciption: The function has the dead letter queue as the event, and forward the event to Slack
tags:
name: Monitor
environment:
# This is the webhook URL from the previous step
SLACK_ENDPOINT: https://hooks.slack.com/services/XXXXXXX/XXXXXX/XXXXXXXXX
events:
- sqs:
# Dead letter queue ARN
arn: arn:aws:sqs:ap-southeast-2:xxxxxxxx:deadletter.fifo
command:
handler: functions/command.handler
tags:
name: Command
desciption: The function handles Slack action and place the message back to the queue
environment:
# Credentials should be retrieved from Parameter Store
SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}
SLACK_OAUTH_TOKEN: ${ssm:/deadletter/slack/oauth-token~true}
events:
- http:
path: slack
method: post
cors: true
- В функция мониторинга есть SLACK_ENDPOINT в качестве переменной окружения, которая будет использоваться для публикации в Slack.
- Функция command находится за шлюзом API, конечная точка — URL запроса для интерактивности Slack.
Функция декомпозирует события из очереди недоставленных сообщений и создаёт полезную нагрузку Slack для отправки. Смотрите api.slack.com/block-kit, чтобы узнать подробности о блоках для разработки в Slack.
Функция мониторинга
import middy from "@middy/core";
import axios from "axios";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
export const monitor = async (event: any): Promise<any> => {
const records = event.Records;
await Promise.all(
records.map((record: any) => {
const messageGroupId = record?.attributes?.MessageGroupId;
const messageDeduplicationId = record?.attributes?.MessageDeduplicationId;
const approximateReceiveCount =
record?.attributes?.ApproximateReceiveCount;
return axios({
method: "post",
url: process.env.SLACK_ENDPOINT,
data: {
blocks: [
{
type: "section",
text: {
type: "mrkdwn",
text: `*Messsge ID*: ${record.messageId}`,
},
},
{
type: "section",
text: {
type: "mrkdwn",
text: `*Message Group Id*: ${messageGroupId}`,
},
},
{
type: "section",
text: {
type: "mrkdwn",
text: `*Message Deduplication Id*: ${messageDeduplicationId}`,
},
},
{
type: "section",
text: {
type: "mrkdwn",
text: `*Approximate Receive Count*: ${approximateReceiveCount}`,
},
},
{
type: "section",
text: {
type: "mrkdwn",
text: record.body,
},
},
{
type: "actions",
elements: [
{
type: "button",
style: "primary",
text: {
type: "plain_text",
text: "Send back",
},
action_id: "sendback",
value: record.body,
},
],
},
],
},
headers: {
"Content-type": "application/json; charset=utf-8",
},
});
})
);
return;
};
export const salesforceDeadLetterMonitor = middy(monitor).use(
doNotWaitForEmptyEventLoop()
);
Каждый раз, когда сообщение поступает в очередь недоставленных сообщений, оно перенаправляется на канал Slack, где вы сможете не только понять, почему сообщение не отправляется, но и действовать.
Часть 3. Отправка обратно
Захватывающая деталь проекта — это возможность отправить недоставленное сообщение обратно в SQS для переработки. При нажатии зеленой кнопки Send back Slack запускает POST-запрос на определённый ранее URL-адрес действия, то есть конечную точку шлюза API.
Функция command, которая отправляет недоставленное сообщение обратно в SQS
import { APIGatewayEvent } from "aws-lambda";
import AWS from "aws-sdk";
import qs from "qs";
import axios from "axios";
import middy from "@middy/core";
import doNotWaitForEmptyEventLoop from "@middy/do-not-wait-for-empty-event-loop";
import httpHeaderNormalizer from "@middy/http-header-normalizer";
import httpEventNormalizer from "@middy/http-event-normalizer";
import httpErrorHandler from "@middy/http-error-handler";
import { slackVerifier } from "../../middlewares/slack/verify";
const sqs = new AWS.SQS({ region: "ap-southeast-2" });
const command = async (event: APIGatewayEvent) => {
if (!event.body) return { statusCode: 200 };
const requestBody: any = qs.parse(event.body);
const payload: any = JSON.parse(requestBody.payload);
let response;
const action = payload.actions[0];
if (action.action_id === "sendback") {
try {
const sqsPayload = payload.message.blocks.find(
(block: any) => block.block_id === "payload"
);
if (sqsPayload?.text?.text && action?.value) {
const payload = JSON.parse(sqsPayload.text.text);
await putBack(payload.jobName, payload.jobData, action.value);
response = {
payload: {
attachments: [
{
color: "good",
text: "Job was sent back",
},
],
response_type: "in_channel",
},
};
}
} catch (error) {
console.error(error);
}
}
if (payload.response_url) {
await axios({
method: "post",
url: payload.response_url,
data: response.payload,
headers: {
"Content-type": "application/json; charset=utf-8",
Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`,
},
});
} else if (response && !payload.response_url && response.payload) {
return {
body: JSON.stringify(response.payload),
statusCode: 200,
};
} else {
return {
statusCode: 200,
};
}
};
const putBack = async (name: string, data: any, workerUrl: string) => {
const params: any = {
MessageBody: JSON.stringify({ jobName: name, jobData: data }),
QueueUrl: workerUrl,
};
return new Promise((resolve: Function, reject: Function): any => {
sqs.sendMessage(params, (err: any, data: any): any => {
if (err) {
reject(err);
} else {
resolve(data);
}
});
});
};
export const handler = middy(command)
.use(doNotWaitForEmptyEventLoop())
.use(httpEventNormalizer())
.use(httpHeaderNormalizer())
.use(slackVerifier())
.use(httpErrorHandler());
Эта функция довольно проста:
- Функция slackVerifier. Она проверяет, что POST-запрос направлен от Slack.
verifier.ts
import crypto from 'crypto';
import qs from 'qs';
export const slackVerifier = () => {
return {
before: async (handler: any) => {
const slackSignature =
handler.event.headers && handler.event.headers['x-slack-signature'];
const timestamp =
handler.event.headers &&
handler.event.headers['x-slack-request-timestamp'];
const time = Math.floor(new Date().getTime() / 1000);
if (Math.abs(time - timestamp) > 300) {
// The request timestamp is more than five minutes from local time.
// It could be a replay attack, so let's ignore it.
return {
statusCode: 401,
body: JSON.stringify('Too old'),
};
}
const body = handler.event.body;
const sigBasestring = `v0:${timestamp}:${body}`;
const hash = crypto
.createHmac('sha256', process.env.SLACK_SIGNING_SECRET)
.update(sigBasestring, 'utf8')
.digest('hex');
const mySignature = `v0=${hash}`;
if (
!crypto.timingSafeEqual(
Buffer.from(mySignature, 'utf8'),
Buffer.from(slackSignature, 'utf8')
)
) {
return {
statusCode: 401,
body: JSON.stringify('Invalid Signature'),
};
}
return;
},
onError: (handler: any) => {
return handler.callback(null, handler.error);
},
};
};
view raw
Переменная среды SLACKSIGNINGSECRET — это переменная со страницы конфигурации Slack, которая вводится из определений бессерверной среды Serverless.yml.
Я настоятельно рекомендую хранить учётные данные в безопасном месте; извлекать их во время развертывания и держать вне исходного кода— это хорошая практика.
SLACK_SIGNING_SECRET: ${ssm:/deadletter/slack/signing-secret~true}
- Функция sendBack. Она получает полезную нагрузку от POST-запроса Slack и отправляет ее обратно в SQS.
Полезная нагрузка, которую Slack отправляет в command-функцию, выглядит так.
Она содержит response_url, который должен использоваться для отправки ответа обратно в Slack, чтобы подтвердить действие.
Код отправляет [полезную нагрузку ответа] обратно в Slack с помощью Bearer-токена.
await axios({
method: "post",
url: payload.response_url,
data: response.payload,
headers: {
"Content-type": "application/json; charset=utf-8",
Authorization: `Bearer ${process.env.SLACK_OAUTH_TOKEN}`
}
});
SLACK_OAUTH_TOKEN вводится из переменных среды во время развёртывания. Вы можете получить её значение на странице конфигурирования Slack:
Создавать такой проект интересно, и, что ещё важнее, он немного облегчает жизнь DevOps. Понемногу эти маленькие удобства создают комфортную среду для работы в целом.
Задумали с нового года начать новую жизнь и подучиться? До конца этого года ещё можно ухватить курс с хорошей скидкой. А если использовать промокод HABR — к скидке на баннере можно прибавлять еще 10%.
Другие профессии и курсы
ПРОФЕССИИ
КУРСЫ
- Обучение профессии Data Science
- Обучение профессии Data Analyst
- Профессия Java-разработчик
- Профессия Frontend-разработчик
- Профессия Этичный хакер
- Профессия C++ разработчик
- Профессия Разработчик игр на Unity
- Профессия iOS-разработчик с нуля
- Профессия Android-разработчик с нуля
- Профессия Веб-разработчик
КУРСЫ
- Курс «Математика и Machine Learning для Data Science»
- Курс по Machine Learning
- Продвинутый курс «Machine Learning Pro + Deep Learning»
- Курс «Python для веб-разработки»
- Курс по JavaScript
- Курс по аналитике данных
AlexSpaizNet
А что будет если что-то очень долго лежит/поломалось, и неотправленных месседжей может быть тысячи… десятки тысяч...? Ж)
Matvey-Kuk
Можно отправлять в amixr.io, он сгруппирует все одинаковые сообщения из SQS и даст даже сделать кастомный рендеринг в слаке.