В одном из проектов встала следующая задача: пользователь загружает пачку файлов через клиента (CloudBerry Explorer, к примеру) в S3 бакет, мы копируем эти файлы в архив и шлем SNS уведомление о том, что все сделано. Перекладывать файлы в архив нужно начинать только тогда, когда пользователь загрузит все, что хотел. Пользователей мало и загружают батчи они довольно редко. Но файлов может быть много.
Чтобы понять, что пора начинать архивацию, зададим определенную структуру каталогов и будем просить пользователя загружать триггер-файлы с расширением .trigger когда он закончит. Этакая эмуляция кнопки Done. Структура каталогов будет такой:
<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>
Как видим, для каждой пачки создается свой каталог <batch_name>
с подкаталогом files, в который и заливаются уже пользовательские файлы с каталогами и именами, которые он хочет. Триггер-файл загружается в <batch_name>
и по ключу этого файла можно понять какие конкретно файлы нужно отправить в архив. Но здесь есть один нюанс, мы хотим при копировании в архив вырезать каталог files
. Т.е. файл <batch_name>/files/<file_key_1>
скопировать в <batch_name>/<file_key_1>
.
К счастью, S3 позволяет отслеживать загрузку файлов с определенным суффиксом и отправлять уведомления при наструплении этого события. В качестве получаетеля этих уведомлений можно указать аж 3 сервиса: SNS, SQS и Lambda-функцию. Но тут не без нюансов. Так, первые 2 типа поддерживают только стандартные очереди и SNS, а FIFO не поддерживают, увы.
Мы же, в случае загрузки триггера, будем ставить задачу в стандартную очередь SQS и в приложении будем эту очередь опрашивать. На питоне это выглядит примерно так:
…
session = boto3.Session(profile_name=profile)
queue = session.resource('sqs', region_name=region).Queue(queue_url)
while True:
# Здесь можно усыпить процесс, если хочется
messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20)
if not messages:
continue
message = messages[0]
# Process message
Окей, мы разобрались как понять, что пользователь закончил заливать свои файлы и пора их архивировать.
Теперь нам нужно скопировать файлы в архив и отправить SNS уведомление. Уведомление нужно отправлять только после того как все файлы уедут в архив. Что нам может предложить для такого случая Амазон?
S3 Batch Operations
Амазон позволяет копировать объекты батчами между бакетами и даже между аккаунтами. Но функционал этот довольно ограничен. Рассмотрим чуть подробнее.
Поставить задачу на копирование можно двумя способами: с использованием отчета об инвентаризации и с помощью CSV манифеста.
Оба способа очень похожи друг на друга и обладают одним и тем же недостатком - они оба недостаточно гибкие. Так к примеру, при постановке задачи на копирование мы можем указать список копируемых файлов из батча в CSV манифесте, а так же можем указать каталог куда все эти файлы скопировать, но выбрать произвольное имя или вообще как-то модифицировать ключ, увы, нельзя. К примеру, если у нас в файле указаны загруженные пользователем файлы из примера выше,<batch_name>/done.trigger
<batch_name>/files/<file_key_1>
<batch_name>/files/<file_key_2>
...
<batch_name>/files/<file_key_n>
то можно все эти файлы загрузить в каталог, скажем, archive
и получить пути вида archive/<batch_name>/files/<file_key_1>
, но вот каталог files
из пути не выдрать, т.е. файл <batch_name>/files/<file_key_1>
скопировать в archive/<batch_name>/<file_key_1>
, увы, нельзя.
В случае с использованием сервиса инвентаризации все то же самое, только вместо манифеста - отчет с похожим содержимым. А очень бы хотелось на самом деле.
К слову, задачи на копирование батчами можно ставить как программно, так и через Консоль.
SDK Python
Когда гибкости стандартных S3 Batch Operations не хватает, Амазон советует пользоваться SDK, что мы и сделаем. У нас питон, а для питона синхронная либа boto3. boto3 под капотом делает вызовы к REST API амазоновских сервисов, поэтому операция копирования файла сводится к отправке HTTP запроса, а это значит, что мы можем немного распараллелить процесс копирования. Однопоточный синхронный код значительно проще конечно, что мне очень импонирует, но его производительности, к сожалению, будет не достаточно, т.к. файлов может быть много, а очень долго ждать не хочется. В итоге имеем вот такой простенький скриптик:
from concurrent.futures import as_completed
from concurrent.futures.thread import ThreadPoolExecutor
import boto3
import click
def get_source_prefix(message_body: str) -> str:
...
def modify_key(key) -> str:
...
@click.command()
@click.option('--queue-url')
@click.option('--source-bucket')
@click.option('--target-bucket')
@click.option('--target-prefix')
@click.option('--max-workers')
@click.option('--region')
@click.option('--profile')
def main(queue_url, source_bucket, target_bucket, target_prefix, max_workers, region, profile):
session = boto3.Session(profile_name=profile)
queue = session.resource('sqs', region_name=region).Queue(queue_url)
s3_client = session.client('s3')
paginator = s3_client.get_paginator('list_objects_v2')
while True:
# Здесь можно усыпить процесс, если очень хочется
# По умолчанию, метод возвращает одно сообщение
messages = queue.receive_messages(AttributeNames=('ALL',), WaitTimeSeconds=20)
if not messages:
continue
message = messages[0]
kwargs = {'Bucket': source_bucket, 'Prefix': f'{get_source_prefix(message.body)}/files'}
keys = [k for page in paginator.paginate(**kwargs) for k in page.get('Contents', [])]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures_to_keys = {
executor.submit(
s3_client.copy,
{'Bucket': source_bucket, 'Key': key['Key']},
target_bucket,
f'{target_prefix}/{modify_key(key["Key"])}'
): key
for key in keys if not key['Key'].endswith('.trigger')
}
as_completed(futures_to_keys)
# шлем SNS уведомление
...
message.delete()
if __name__ == '__main__':
main()
Комментарии (4)
shachneff
15.09.2021 09:40Амазон хорошо, но для персданных в РФ не подходит. Подскажите, а яндекс или майлру клауды так же умеют?
romblin Автор
15.09.2021 12:55Про российские облака я ничего толком не знаю. Знаю только, что у них есть хранилища типа S3, как там все работает не знаю. С Амазоном бы хотя бы разобраться))
SpAwN_gUy
Так и не увидел: кто что архивирует? Бэкап/копирование - вижу. Архивирования чот не вижу.
И, мне кажется, что Архивирования в зип не будет многопоточным. В 7зип может и будет, но файлы по очереди читает всеравно
Можно ещё через s3 по паттерну (.trigger) запускать лямбду. А эту лямбду написать на питоне. Так же все примеры по ресайзу картинок в с3 работают.
romblin Автор
Под архивацией я имел ввиду копирование в архивный бакет, бэкап/копирование. Оттуда файлы потом в ледник уезжают. Похоже я не очень хорошо подобрал термин