Максим Павлунин

Руководитель Центра мониторинга, Angara SOC

Технологии машинного обучения все стремительней входят в нашу жизнь и находят применение в различных сферах деятельности. В медицине развивается диагностика различных заболеваний и методов лечения. В автомобильной промышленности машинное обучение успешно применяется в сервисах для помощи водителю и готовится полноценно забрать управление автомобилем на себя. В финансовой сфере ML используется для прогнозирования рынка и определения рисков. А что происходит в сфере информационной безопасности?

В ИБ технологии машинного обучения успешно применяются в системах защиты информации, таких как IDS/IPS, WAF, Antivirus, Anti-phishing и т.д. Все эти СЗИ обрабатывают огромные объемы данных, и выявление атак сопряжено с большой вариативностью техник злоумышленника. Конечно же сигнатурный анализ не готов уйти в историю и является основой для выявления угроз, но есть ряд существенных ограничений. В основном, правила направлены на выявление известных техник и не всегда способны учесть все возможные вариации, т.е. направлены на выявление ряда частных случаев. Соответственно, выявление неизвестных атак достаточно проблематично. К тому же количество правил/сигнатур стремительно растет, что затрудняет их сопровождение и приводит к высокому потреблению вычислительных мощностей. Модели машинного обучения позволяют решить эти проблемы, так как способны обучаться на больших данных и выявлять аномальные паттерны, которые невозможно учесть при сигнатурном анализе. В современных СЗИ данные технологии применяются совместно и дополняют друг друга.

Аналогичные проблемы существуют и в SOC относительно методик выявления инцидентов ИБ. Основой являются правила корреляции SIEM, которые в отдельных кейсах стремятся покрыть выявление всех возможных атак, но не могут этого сделать в виду различных ограничений. В отличие от СЗИ машинное обучение в инструментах SOC развито достаточно слабо. Мы тестировали ряд систем, таких как Anomaly Detection и UEBA, но широкого применения в SOC они не нашли из-за большого количества ложных срабатываний. Других инструментов, направленных на выявление инцидентов ИБ, нам, к сожалению, найти не удалось.

Как говорится, хочешь сделать хорошо - сделай сам! Вооружившись этой идеей, мы решили выделить некоторые точечные кейсы, с проблематикой описанной выше, и автоматизировать выявление с применением машинного обучения. В данной статье мы поделимся нашим опытом на примере выявления вредоносного кода PowerShell, соберем нейронную сеть и рассмотрим варианты ее использования совместно с SIEM.

Итак, что мы знаем о PowerShell? Это командный интерпретатор, встроенный в ОС Windows, который широко используется злоумышленниками при проведении различных атак, более подробно описан в T1059.001 MITRE. Существует большое количество инструментов на языке PowerShell, которые могут применяться как в составе ВПО, так и непосредственно злоумышленником при компрометации системы. PowerSploit, Empire, Nishang – это лишь малая часть общеизвестных коллекций подобных утилит для автоматизации действий злоумышленника, направленных на сбор информации, эксплуатацию уязвимостей, повышение привилегий и т.д. Но в каждой организации IT-служба использует и легитимные скрипты для автоматизации администрирования. Отличить легитимное от вредоносного иногда довольно сложно, а проанализировать миллионы скриптов для написания правил скорее невозможно.

Идея заключается в том, чтобы подготовить большой набор скриптов, классифицировать их и обучить на этом наборе модель нейронной сети. В результате обучения модель самостоятельно проанализирует образцы кода и выделит паттерны, свойственные как для легитимных, так и для вредоносных скриптов. Т.е. передать модели ту работу, которую лень проблематично сделать человеку. Хитрый план ????, не правда ли?

Формирование датасета

Для начала нам необходимо сформировать датасет из отсортированных образцов кода PowerShell. Для экономии времени мы будем использовать уже готовый, доступный по этой ссылке.

К данному датасету прилагается ресерч, в котором авторы объясняют принципы, по которым им удалось классифицировать образцы с применением классических моделей машинного обучения, таких как Random Forest. Мы же будем нарабатывать свой опыт с использованием алгоритмов глубокого обучения, т.е. искусственных нейронных сетей.

Нейронные сети обучаются на основе большого количества данных, они адаптируют свои веса, чтобы оптимизировать способность к распознаванию и классификации паттернов в данных. Сама сеть состоит из множества слоев с заданным количеством нейронов, каждый из которых выполняет вычисления над данными. Нейроны связанных между собой слоев имеют взаимосвязи в виде веса, т.е. величину связи. Обучение нейронных сетей обычно происходит с использованием алгоритмов обратного распространения ошибки, которые корректируют веса нейронов на основе разницы между предсказанными и фактическими значениями. Таким образом в ходе обучения проводится своего рода «работа над ошибками», в результате которой улучшается качество работы сети.

Скачаем и распакуем датасет. Из описания следует:

malicious_pure: содержит образцы вредоносного кода;

powershell_benign_dataset: содержит легитимные образцы кода;

mixed_malicious: в легитимный код встроены вредоносные элементы.

Файлы из mixed_malicious мы не будем использовать, чтобы не внести дисбаланс в текущий датасет, и оставим их для самостоятельных экспериментов. Итого имеем две части датасета для бинарной классификации: вредоносный/легитимный.

Создадим еще две директории для тестирования модели и переместим в них файлы:

test_ds_bad: 100 вредоносных образцов из malicious_pure;

test_ds_valid: 100 легитимных образцов из powershell_benign_dataset.

Проектировать модель нейронной сети мы будем на языке Python с использованием библиотеки TensorFlow. При построении модели будем придерживаться концепции классификации текстовой информации, в нашем случае в виде текста будет выступать код PowerShell.

Импортируем необходимые модули:

import os
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split
import re
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from matplotlib import pyplot as plt

Загрузка датасета

Напишем функцию, которая считает все файлы из директории, удалит ненужные знаки форматирования и поместит данные в массив.

def read_files(path):
    dataset = []
    for filename in os.listdir(path):
        with open(os.path.join(path, filename), encoding = 'utf-8') as f:
            line = f.read().replace('\n', ' ').replace('\t', ' ').replace('\r', ' ')
            dataset.append(re.sub(r'\s+', ' ', line))
    return dataset

Загрузим датасет из директорий и создадим массивы с метками:

0: для легитимного кода;

1: для вредоносного;

Т.е. для решения нашей задачи мы будем использовать 2 категории: 0 и 1.

# Загрузка датасета
train_bad = read_files('malicious_pure')
train_valid = read_files('powershell_benign_dataset')
test_bad = read_files('test_ds_bad')
test_valid = read_files('test_ds_valid')

# Объединяем массивы в единые датасеты
train_ds = train_bad + train_valid
test_ds = test_bad + test_valid
# Создаем массивы с метками
# 1 - malicious
# 0 - clean
y_train_bad = [1 for i in range(0, len(train_bad))]
y_train_valid = [0 for i in range(0, len(train_valid))]
y_train = y_train_bad + y_train_valid
y_test_bad = [1 for i in range(0, len(test_bad))]
y_test_valid = [0 for i in range(0, len(test_valid))]
y_test = y_test_bad + y_test_valid

На текущий момент у нас есть два массива с данными и два с метками. train_ds мы будем использовать для обучения, а test_ds - для оценки модели.

Далее необходимо преобразовать данные таким образом, чтобы модель могла оперировать определенными значениями – токенами. Для нас это будут элементы кода, такие как название переменных, функций, командлеты PowerShell и даже слова из комментариев. Для модели токены будут преобразованы в числовые значения.

Реализуем следующие идеи:

1) Заменим громоздкие массивы Base64, которые встречаются в коде на наш единый токен base64_string. Для модели подобные массивы будут бесполезны, т.к. будет много уникальных значений. Модель будет анализировать информацию в виде чисел и не сможет отличить Base64, скажем, от другого слова. Введение подобного токена поможет акцентировать на этом внимание.

2) Заменим IP адреса на два вида токенов: external_ip для внешних адресов и internal_ip для внутренних. IP адреса встречаются как в легитимном коде, так и во вредоносном, при этом нам совершенно не важно, что это за адрес. Достаточно понимать, внутренний он или внешний.

3) Часть спецсимволов будем использовать как разделитель. Скобки, кавычки и прочие элементы кода не несут особой смысловой нагрузки для решения нашей задачи, при этом мы будем использовать их в качестве разделителя и заменим на пробел.

4) Удалим лишние пробелы и мусор для того, чтобы уменьшить объем кода.

5) Приведем все символы к нижнему регистру, чтобы стандартизировать токены.

def clean_data_to_tokens(dataset):
    tokens = []
    char_replace = "()[]{},;'/\=:^<>|`+\""
    for source_string in dataset:
        source_string = re.sub(r"FromBase64String\(\'(.*)\'\)", "base64_string", source_string)
        source_string = re.sub(r"([a-zA-Z0-9\/\+=]{100,})+", "base64_string", source_string)
        for char in char_replace:
            source_string = source_string.replace(char, ' ')
        source_string = re.sub(r"(?<!\S)\d+(?!\S)", " ", source_string)
        source_string = re.sub(r"0x\S+", " ", source_string)
        ip_addresses = re.findall(r'(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)', source_string)
        for ip in ip_addresses:
            if ip.startswith('10.') or ip.startswith('192.') or ip.startswith('172.16.') or ip.startswith('127.0.'):
                source_string = source_string.replace(ip, 'internal_ip')
            else:
                source_string = source_string.replace(ip, 'external_ip')
        source_string = source_string.replace('.', ' ')
        tokens.append(re.sub(r'\s+', ' ', source_string.lower()))
    return tokens
# Обрабатываем данные
train_token_ds = clean_data_to_tokens(train_ds)
test_token_ds = clean_data_to_tokens(test_ds)

Посмотрим, что из этого получилось.

Отлично, вроде бы все планируемые преобразования работают, как задумано.

Выведем статистику по тренировочному датасету:

# Вывод статистики по датасету
def dataset_info(input_data):
    sent_lens = [len(sentence.split()) for sentence in input_data]
    avg_sent_len = np.mean(sent_lens)
    max_token_len = max([len(sentence.split()) for sentence in input_data])
    print('Средняя длинна в токенах: ', avg_sent_len) 
    print('Максимальная длинна в токенах: ', max_token_len)
    output_seq_len_95 = int(np.percentile(sent_lens, 95))
    print('Количество токенов в строке покрывающих 95% всех строк: ', output_seq_len_95)
    output_seq_len_97 = int (np. percentile(sent_lens, 97))
    print('Количество токенов в строке покрывающих 97% всех строк: ', output_seq_len_97)
    output_seq_len_99 = int(np.percentile(sent_lens, 99))
    print('Количество токенов в строке покрывающих 99% всех строк: ', output_seq_len_99)

# Cловарь с уникальными токенами
def dataset_vocab(input_data):
    unique_words = {}
    for data in input_data:
        words = data.split()
        for word in words:
            if word not in unique_words:
                unique_words[word] = 1
            else:
                unique_words[word] += 1
    # Сортируем словарь от большего к меньшему
    sorted_unique_words = {k:v for k,v in sorted(unique_words.items(), key=lambda item: item[1], reverse=True)}
    # Ограничение на вывод первых 100 элементов словаря
    sorted_unique_words = dict(list(sorted_unique_words.items())[:100])
    unique_words_count = len(unique_words.keys())
    print('Словарь уникальных токенов в исходных данных (Токен:Количество): \n', sorted_unique_words)
    print('Количество уникальных токенов в исходных данных: ', unique_words_count)
# Выводим статистику по тренировочному датасету
print('Информация по данным тренировочного датасета') 
dataset_info(train_token_ds)
dataset_vocab(train_token_ds)   

Эта статистика поможет нам определится с несколькими значениями, которые будут влиять как на потребление вычислительных мощностей при обучении, так и на точность. Это максимальная длина одного образца в токенах и размер словаря (количество уникальных токенов).

Подготавливаем данные для обучения

Самый большой образец в нашем датасете состоит из 29572, при этом 97% образцов не превышают 1589. Нам необходимо стандартизировать максимальное значение токенов в образцах, прежде чем мы будем переводить их в числовые значения для обучения модели. Все образцы с меньшим количеством токенов будут заполнены нулями до максимального значения. Слишком большие значения могут существенно повлиять на производительность.  В нашем примере возьмем небольшой запас и обрежем все строки, превышающие 2000 токенов.

# Функция обрезает строки в массиве до количества токенов max_token_count
def truncate_string(input_data, max_token_count):
  for index in range(len(input_data)):
    token_count = len(input_data[index].split())
    if token_count > max_token_count:
      input_data[index] = ' '.join(input_data[index].split()[:max_token_count])
  return input_data
max_token = 2000 # Значение в токенах покрывающее ~97% всех строк
train_token_ds = truncate_string(train_token_ds, max_token)
test_token_ds = truncate_string(test_token_ds, max_token)

Тренировочный датасет необходимо разделить на 2 части: для тренировки и валидации. Валидационная часть будет состоять из 20% случайно взятых образцов и использоваться для оценки эффективности обучения модели на каждой отдельной эпохе (итерации обучения). Преобразуем метки, объединим их с данными и окончательно сформируем датасеты:

train_dataset: для тренировки;

val_dataset: для валидации;

test_dataset: для тестирования.

# Разделяем тренировочный датасет на 2 части (для тренировки и валидации) в пропорции 80х20
train_tokens, val_tokens, train_labels, val_labels = train_test_split(train_token_ds, y_train, test_size=0.2, random_state=42)
# Отдельно сохраняем валидационные метки, далее будем использовать для тестирования модели.
val_labels_encoded = val_labels
# Кодируем метки OneHotEncoder
train_labels = pd.DataFrame(train_labels)
val_labels = pd.DataFrame(val_labels)
one_hot_encoder = OneHotEncoder(sparse_output=False)
train_labels = one_hot_encoder.fit_transform(train_labels.to_numpy().reshape(-1, 1))
val_labels = one_hot_encoder.transform(val_labels.to_numpy().reshape(-1, 1))
# Формируем датасеты для тренировки, валидации и проверки
train_dataset = tf.data.Dataset.from_tensor_slices((train_tokens, train_labels))
val_dataset = tf.data.Dataset.from_tensor_slices((val_tokens, val_labels))
test_dataset = tf.data.Dataset.from_tensor_slices((test_token_ds))
train_dataset = train_dataset.batch(16).prefetch(tf.data.AUTOTUNE)
val_dataset = val_dataset.batch(16).prefetch(tf.data.AUTOTUNE)
test_dataset = test_dataset.batch(16).prefetch(tf.data.AUTOTUNE)

Кодирование меток при помощи OneHotEncoder позволит создать по отдельному столбцу, содержащему бинарные значения (true\false) для каждой категории (0 и 1).

Векторизация данных

Слой TextVectorization используется для векторизации текстовых данных. Векторизация текста подразумевает преобразование текстовых строк в числовой формат, который может быть использован алгоритмами машинного обучения. После применения слоя TextVectorization, каждая текстовая строка будет представлена в виде числового вектора, где каждая компонента вектора соответствует определенному токену из текста.

Настроим слой и создадим словарь токенов на основе тренировочных данных. В качестве значения для максимального размера словаря возьмем 90000 самых часто встречающихся токенов по информации из статистики.

max_tokens_count = 90000 # Максимальный размер словаря нашего датасета ~91000 мы возьмем ТОП самых часто встречающихся токенов
# Создадим слой TextVectorization
text_vectorizer = layers.TextVectorization(max_tokens=max_tokens_count, # Максимальный размер словаря
                                           output_sequence_length=max_token, # максимальное количество токенов
                                           standardize=None) # Не используем стандартизацию т.к. уже преобразовали данные
text_vectorizer.adapt(train_tokens) # Формирование словаря

Слой Embedding используется для преобразования целочисленных индексов в плотные векторные представления (эмбеддинги), которые представляют семантическую схожесть и отношения между токенами. Создание эмбеддингов позволяет представить токены в непрерывном пространстве, где близкие по смыслу токены имеют близкие векторные представления.

token_embed = layers.Embedding(input_dim=max_tokens_count, # Максимальный размер словаря
                               output_dim=16, # Количество эмбеддингов
                               mask_zero=True)

Проектируем модель

Модель должна состоять из входного слоя, скрытых слоев и выходного слоя. Входной слой принимает данные и передает их в нижестоящий слой. Скрытые слои представляют из себя серию слоев с нейронами, которые обрабатывают данные. Выходной слой возвращает результат работы сети.

Мы будем использовать следующую модель:

inputs = layers.Input(shape=(1,), dtype=tf.string)
text_vectors = text_vectorizer(inputs) # Слой TextVectorization
token_embeddings = token_embed(text_vectors) # Слой Embedding
model = layers.Convolution1D(32, kernel_size=32, padding="same",activation="relu")(token_embeddings)
model = layers.MaxPooling1D(pool_size=(2))(model)
model = layers.Bidirectional(layers.LSTM(16))(model)
outputs = layers.Dense(2, activation="softmax")(model)
model_pwsh = tf.keras.Model(inputs, outputs)

Входной слой inputs принимает входные данные и передает их слою text_vectors для векторизации. Векторы преобразуются в эмбеддинги при помощи слоя token_embeddings. Слой Convolution1D применяет операцию свертки на входные векторные данные с использованием 32 фильтров и размером ядра 32. Данный слой позволяет автоматически извлекать локальные признаки из одномерных данных. MaxPooling1D уменьшает размерность фильтрованных признаков путем выбора наибольшего значения в каждом окне заданного размера, это позволит уменьшить количество вычислений в модели. Bidirectional LSTM использует двунаправленную LSTM. LSTM позволяет модели учитывать контекст и последовательность данных при прогнозировании результатов. На выходном слое данные будут классифицироваться двумя нейронами. Мы получим вероятностную величину для каждого класса, т.е. на сколько образец относится к вредоносному коду (1) и к легитимному (0).

Таким образом в текущей модели будет использоваться комбинация слоев, свойственных как сверточным сетям (Convolutional Neural Networks), так и рекуррентным (Recurrent Neural Networks).

Скомпилируем и выведем информацию по нашей модели:

model_pwsh.compile(loss="categorical_crossentropy",
                   optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
                   metrics=["accuracy"])
print(model_pwsh.summary())

Создадим callback для сохранения модели с лучшими показателями val_accuracy на диск:

checkpoint = tf.keras.callbacks.ModelCheckpoint(filepath='model_pwsh',
                                                monitor='val_accuracy',
                                                save_best_only=True)

Запустим обучение модели на 15 эпохах. Эпоха — это одна итерация обучения, на которой модель обучится на 100% образцов. Во время эпохи модель обновляет веса на основе ошибок, вычисленных на каждом шаге.  На шаге модель делает предсказания, руководствуясь значениями текущих весов по небольшому количеству образцов. Это количество в нашем случае равно 16 образцов, мы задали его при формировании датасета: train_dataset.batch(16). Одной эпохи, как правило, недостаточно для достижения хорошего результата, поэтому модель обучают на нескольких. В конце каждой эпохи модель сделает предсказание по валидационным данным и рассчитает результаты в виде метрик val_loss и val_accuracy. Это своего рода «экзамен» на неизвестных данных, результаты которого покажут динамику прогресса обучения.

model_history = model_pwsh.fit(train_dataset,
                              epochs=15,
                              validation_data=val_dataset,
                              callbacks=[checkpoint])

Построим графики статистики обучения:

# Loss
plt.plot(model_history.history['loss'], label='loss')
plt.plot(model_history.history['val_loss'], label='val_loss')
plt.title('Статистика обучения модели (loss)')
plt.ylabel('value')
plt.xlabel('No. epoch')
plt.legend(loc="upper right")
plt.show()
# Accuracy
plt.plot(model_history.history['accuracy'], label='accuracy')
plt.plot(model_history.history['val_accuracy'], label='val_accuracy')
plt.title('Статистика обучения модели (accuracy)')
plt.ylabel('value')
plt.xlabel('No. epoch')
plt.legend(loc="lower right")
plt.show()

По результатам обучения видно, что после 10-й эпохи показатель потерь на валидационных данных val_loss стал расти и точность val_accuracy уже не показала лучших результатов. Это связано с переобучением модели. Потери модели – это метрика, которая измеряет, насколько предсказанные значения отличаются от правильных на определенном наборе данных (тренировочном или валидационном). Чем меньше эта метрика, тем лучше. Переобучение – это явление, при котором модель показывает хорошие результаты на тренировочных данных, но на валидационных показатель потерь растет и точность падает. Т.е. модель уже не ищет закономерности для предсказания результата на неизвестных ей данных, а просто «заучивает» тренировочные. Именно для этого мы и сделали callback, чтобы сохранить модель с максимальной способностью для предсказания.

Тем не менее на тренировочных данных модель на лучшей 10-й эпохе смогла правильно классифицировать 99.73% образцов, а на валидационных - 99.16%.

Так как показатели потерь достаточно низкие и разница между accuracy и val_accuracy небольшая, можно сделать вывод, что модель хорошо справляется с нашей задачей.

Попробуем загрузить обученную модель из сохраненных файлов и классифицировать на ней наш тестовый и валидационный датасеты, т.к. они не участвовали в процессе обучения. Это позволит нам оценить результаты обучения модели.

# Функция для подготовки результатов обучения модели
def calculate_results(y_true, y_pred):
  # Расчет accuracy
  model_accuracy = accuracy_score(y_true, y_pred) * 100
  # Расчет precision, recall, f1
  model_precision, model_recall, model_f1, support = precision_recall_fscore_support(y_true, y_pred, average="weighted")
  model_results = {"accuracy": model_accuracy,
                  "precision": model_precision,
                  "recall": model_recall,
                  "f1": model_f1}
  return model_results

# Загрузка модели
test_model = tf.keras.models.load_model("model_pwsh")
# Тестирование модели
print('Тестирование модели на неизвестных данных')
# Получение результатов работы модели на тестовом датасете
model_test_probs = test_model.predict(test_dataset)
# Конвертация результатов в классы
model_test_preds = tf.argmax(model_test_probs, axis=1)
# Расчет результатов
model_test_results = calculate_results(y_true=y_test,
                                       y_pred=model_test_preds)
print('Результаты тестирования: \n',model_test_results)

print('Тестирование модели на валидационных данных')
model_test_probs = test_model.predict(val_dataset)
model_test_preds = tf.argmax(model_test_probs, axis=1)
model_test_results = calculate_results(y_true=val_labels_encoded,
                                       y_pred=model_test_preds)
print('Результаты тестирования: \n',model_test_results)

Accuracy, Precision, Recall и F1-мера являются популярными метриками, используемыми для оценки результатов обучения. Более подробно о метриках можно почитать тут или тут.

Accuracy — это общая доля правильно классифицированных образцов (как 1, так и 0) от общего числа образцов в наборе данных. Accuracy измеряет общую способность модели правильно предсказывать результаты.

Precision (точность) — это доля верно предсказанных положительных классов от всех прогнозируемых положительных классов. Высокое значение precision означает, что модель имеет низкую вероятность предсказывать ложно положительные результаты.

Recall (полнота) — это доля верно предсказанных положительных классов от всех объектов положительных классов. Высокое значение recall означает, что модель имеет низкую вероятность пропустить реальные положительные результаты.

F1-мера является гармоническим средним между precision и recall. F1-мера является показателем баланса между точностью и полнотой.

В нашем случае значения точности, полноты и F1-меры очень близки друг другу и могут подтвердить хорошие результаты обучения. По тестовым данным 100% образцов классифицированы правильно. Это говорит скорее о том, что 200 образцов, которые мы изъяли из общего датасета, содержали очень знакомые паттерны, по которым модель без особого труда смогла верно предсказать результат.

Конечно же подобная оценка модели далеко не идеальна, т.к. на других данных могли быть существенные отличия в результатах. Да и в целом все этапы построения модели являются всего лишь Proof of concept (POC), нежели Best practice. Абсолютно все можно сделать лучше и оптимизировать под конкретную задачу.

Обратите внимание, что разделение датасета на тестовый и валидационный осуществлялось путем случайного отбора. В таком случае, например, часть «сложных» образцов может попасть только в валидационные данные, и модель просто не сможет по ним корректно обучиться. Каждый цикл обучения будет уникальным, и на выходе могут быть разные результаты. Это можно компенсировать введением статичного разделения и руками перебирать тысячи образцов, что слишком трудозатратно, на мой взгляд. К тому же размер текущего датасета слишком мал для глубокого обучения и не подходит для боевой модели. Необходимо собрать как минимум десятки тысяч образцов, содержащих как реальные примеры скриптов, используемых в целевой инфраструктуре, так и образцы актуальных инструментов, используемых при проведении атак. Усложнение датасета приведет к тому, что придется пройти все шаги заново и оптимизировать модель, именно тогда она сможет показать очень достойные результаты. В ходе оптимизации можно будет экспериментировать с составом и объемом слоев, скоростью обучения и другими параметрами. Хочу посоветовать материалы от Microsoft с описанием модели, используемой в Windows Defender. В них описана аналогичная модель по архитектуре, но она обрабатывает два входных потока данных, что приводит к хорошей прибавке в точности на сложных датасетах.

Использование ML-моделей в SOC

Итак, у нас есть обученная модель, но что же с ней делать, если в вашей SIEM нет такого функционала? Придется сделать его самому!

Практически все SIEM поддерживают запросы к БД через API, соответственно, мы можем запросить любое событие и обрабатывать его за пределами системы. Источником данных для решения нашей задачи будут являться журналы ОС Windows, которые подключены и анализируются любым SOC. Нас интересует журнал «PowerShell Operational», а конкретно событие ID 4104. В поле «ScriptBlockText» содержатся фрагменты кода PowerShell, который запускается в ОС. Большие скрипты разбиваются на несколько фрагментов, что для нас является больше плюсом при стандартизации максимальной длинны образца. Обфускация кода злоумышленником также становится меньшей проблемой, т.к. сам PowerShell (v5.1+) проведет деобфускацию перед запуском и запишет отдельное событие с исходным кодом.

Необходимо написать скрипт (ML client), который запросит события из SIEM, обработает данные таким же образом, как мы их готовили выше, и передаст в обученную модель. Модель вернет вердикт, который можно записать в исходное событие в виде тега или отдельного поля.

Далее при помощи правил корреляции можно обрабатывать события с тегом, вносить исключения и передавать алерты в IRP-систему для анализа.

На практике подобная модель генерирует единичные ложные срабатывания при обработке нескольких сотен тысяч образцов, что, на мой взгляд, является хорошим показателем для использования в SOC.

Использование моделей машинного обучения совместно с классическим сигнатурным анализом имеет большой потенциал для развития методов выявления инцидентов ИБ. При помощи данного подхода выявление неизвестных угроз уже не звучит так футуристично. На рассмотренном нами примере модель самостоятельно выделила паттерны для классификации кода, которые невозможно учесть в правилах корреляции. Только представьте, сколько кейсов обладают подобной проблематикой и могут быть автоматизированы. Помимо новых методов выявления инцидентов, машинное обучение сможет помочь и в повышении точности работы самих правил корреляции, фильтрации ложных срабатываний, автоматизации алгоритмов расследования и многих других задачах SOC.

Комментарии (2)


  1. anonymous
    02.08.2023 06:19

    НЛО прилетело и опубликовало эту надпись здесь


  1. landstalker
    02.08.2023 06:19
    -2

    Зачем? Если через пару лет будет только RuLdap, возможно даже на 1С написаный. Хотя сам концепт, можно попробовать под любой скриптовый интерпретируемый язык подстроить.