MLflow - это инструмент для управления жизненным циклом машинного обучения: отслеживание экспериментов, управление и деплой моделей и проектов. В этом руководстве мы посмотрим, как организовать эксперименты и запуски, оптимизировать гиперпараметры с помощью optuna, сравнивать модели и выбирать лучшие параметры. Также рассмотрим логирование моделей, использование их в разных форматах, упаковку проекта в MLproject и установку удаленного Tracking Server MLflow.
Вы можете просто прочитать статью или повторить все шаги локально. Также по ссылке репозиторий, который является расширенной версией этого гайда на английском в README.
Подготовка env
Сначала нам нужно установить conda
и создать новоет окружение:
name: mlflow-example
channels:
- conda-forge
- defaults
dependencies:
- python=3.11.4
- pip=24.0
- mlflow=2.14.2
- xgboost=2.0.3
- jupyter=1.0.0
- loguru=0.7.2
- shap=0.45.1
- pandas=2.2.2
- scikit-learn=1.5.1
- scipy=1.14.0
- numpy=1.26.4
- jupytext=1.16.3
- psutil=6.0.0
- boto3=1.34.148
- psycopg2-binary=2.9.9
- pip:
- optuna==3.6.1
- mlserver==1.3.5
- mlserver-mlflow==1.3.5
- mlserver-xgboost==1.3.5
Можно установить вручную или сохранить этот файл локально и вызвать:
conda env create -f conda.yaml
Теперь можно использовать это окружение в IDE или в терминале:
conda activate mlflow-example
MLflow UI
Выполните в терминале mlflow ui
, чтобы запустить MLflow на localhost:5000.
Mlflow будет запущен в локальном режиме, по умолчанию он создаст папку mlruns
для хранения артефактов и метаинформации.
Mlflow Projects
MLflow Project - это подход к структурированию проекта, следуя конвенциям, которые упрощают запуск проекта для других разработчиков (или автоматических инструментов). Каждый проект — это директория файлов или git-репозиторий, содержащий ваш код. MLflow может запускать эти проекты, основываясь на определенных правилах организации файлов в директории.
Файл MLproject помогает MLflow и другим пользователям понять и запустить ваш проект, указывая окружение, точки входа и возможные параметры для настройки:
name: Cancer_Modeling
conda_env: conda.yaml
entry_points:
data-preprocessing:
parameters:
test-size: {type: float, default: 0.33}
command: "python data_preprocessing.py --test-size {test-size}"
hyperparameters-tuning:
parameters:
n-trials: {type: int, default: 10}
command: "python hyperparameters_tuning.py --n-trials {n-trials}"
model-training:
command: "python model_training.py"
data-evaluation:
parameters:
eval-dataset: {type: str}
command: "python data_evaluation.py --eval-dataset {eval-dataset}"
Мы можем запускать эндпоинты проекта, используя интерфейс командной строки (CLI), либо API на Python.
Mlflow Experiments and Mlflow Runs
MLflow Experiments и MLflow Runs - это основные абстракции для структурирования проекта. Давайте рассмотрим пример разработки модели на открытом датасете с данными по забалеванию раком.
Data preprocessing
Этот шаг обычно более сложный, но здесь мы просто загрузим набор данных, разделим его на обучающую и тестовую выборки, залогируем несколько метрик в MLflow (количество образцов и признаков) и передадим сами наборы данных в артефакты MLflow.
import sys
import argparse
import mlflow
import warnings
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn import datasets
from loguru import logger
from config import config
# set up logging
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
warnings.filterwarnings('ignore')
def get_cancer_df():
cancer = datasets.load_breast_cancer()
X = pd.DataFrame(cancer.data, columns=cancer.feature_names)
y = pd.Series(cancer.target)
logger.info(f'Cancer data downloaded')
return X, y
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--test-size", default=config.default_test_size, type=float)
TEST_SIZE = parser.parse_args().test_size
logger.info(f'Data preprocessing started with test size: {TEST_SIZE}')
# download cancer dataset
X, y = get_cancer_df()
# add additional features
X['additional_feature'] = X['mean symmetry'] / X['mean texture']
logger.info('Additional features added')
# log dataset size and features count
mlflow.log_metric('full_data_size', X.shape[0])
mlflow.log_metric('features_count', X.shape[1])
# split dataset to train and test part and log sizes to mlflow
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=TEST_SIZE)
mlflow.log_metric('train_size', X_train.shape[0])
mlflow.log_metric('test_size', X_test.shape[0])
# log and register datasets
train = X_train.assign(target=y_train)
mlflow.log_text(train.to_csv(index=False),'datasets/train.csv')
dataset_source_link = mlflow.get_artifact_uri('datasets/train.csv')
dataset = mlflow.data.from_pandas(train, name='train', targets="target", source=dataset_source_link)
mlflow.log_input(dataset)
test = X_test.assign(target=y_test)
mlflow.log_text(test.to_csv(index=False),'datasets/test.csv')
dataset_source_link = mlflow.get_artifact_uri('datasets/test.csv')
dataset = mlflow.data.from_pandas(train, name='test', targets="target", source=dataset_source_link)
mlflow.log_input(dataset)
logger.info('Data preprocessing finished')
Чтобы выполнить этот код сохраните его в файл data-preprocessing.py
рядом с файлом MLproject
и в командной строке вызовете:
mlflow run . --entry-point data-preprocessing --env-manager local --experiment-name Cancer_Classification --run-name Data_Preprocessing -P test-size=0.33
Теперь этот запуск должен быть доступен через ui. Секция datasets used заполнена благодаря mlflow.data api. Однако важно понимать, что это метаданные о даных, а не сами данные.
Модуль mlflow.data отслеживает информацию о наборах данных во время обучения и оценки модели: признаки, целевые значения, предсказания, название, схема и источник. Эти метаданные логируются с помощью
mlflow.log_input()
.
Hyperparameters tuning
В этой части мы используем optuna
для нахождения лучших гиперпараметров для XGBoost, используя встроенную кросс-валидацию для обучения и оценки модели. Кроме того, мы посмотрим, как собирать метрики во время процесса обучения.
Здесь мы используем conda
в качестве env менджера: mlflow
автоматически создаст для нас новый env из файла, указанного в конфигурации MLproject
.
mlflow run . --entry-point hyperparameters-tuning --env-manager conda --experiment-name Cancer_Classification --run-name Hyperparameters_Search -P n-trials=10
Посмотрим на результаты в MLflow UI. Для структурирования проекта можно использовать вложенные запуски. Есть один основной запуск для настройки гиперпараметров, а все испытания собираются как вложенные запуски. MLflow также предоставляет возможность настраивать столбцы и порядок строк в этом представлении:
В сhart view можно сравнить запуски и настроить различные диаграммы. Использование XGBoost callbacks для логирования метрик в процессе обучения модели позволяет создавать графики с количеством деревьев на оси x.
Выберите несколько запусков, нажмите кнопку сравнения и выберите наиболее полезное представление. Функция mlflow compare может быть полезной при оптимизации гиперпараметров, так как помогает уточнить и корректировать границы возможных интервалов на основе результатов сравнения.
Системные метрики также могут отслеживаться на протяжении всего запуска. Хотя это и не дает точной оценки реальных требований, в некоторых случаях все же может быть полезным.
Log and register model
Возможно, но не обязательно, сохранять модель для каждого эксперимента и запуска. В большинстве случаев лучше сохранять параметры, а затем, выбрав лучшие, выполнить дополнительный запуск для сохранения модели. Здесь мы следуем той же логике: используем параметры из лучшего запуска для сохранения финальной модели и регистриреум ее для версионирования и использования через короткую ссылку.
import os
import sys
import tempfile
import mlflow
import warnings
import logging
import xgboost as xgb
import pandas as pd
from loguru import logger
# set up logging
warnings.filterwarnings('ignore')
logging.getLogger('mlflow').setLevel(logging.ERROR)
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
if __name__ == '__main__':
logger.info('Model training started')
mlflow.xgboost.autolog()
with mlflow.start_run() as run:
experiment_id = run.info.experiment_id
run_id = run.info.run_id
logger.info(f'Start mlflow run: {run_id}')
# get last finished run for data preprocessing
last_data_run_id = mlflow.search_runs(
experiment_ids=[experiment_id],
filter_string=f"tags.mlflow.runName = 'Data_Preprocessing' and status = 'FINISHED'",
order_by=["start_time DESC"]
).loc[0, 'run_id']
# download train and test data from last run
with tempfile.TemporaryDirectory() as tmpdir:
mlflow.artifacts.download_artifacts(run_id=last_data_run_id, artifact_path='datasets/train.csv', dst_path=tmpdir)
mlflow.artifacts.download_artifacts(run_id=last_data_run_id, artifact_path='datasets/test.csv', dst_path=tmpdir)
train = pd.read_csv(os.path.join(tmpdir, 'datasets/train.csv'))
test = pd.read_csv(os.path.join(tmpdir, 'datasets/test.csv'))
# convert to DMatrix format
features = [i for i in train.columns if i != 'target']
dtrain = xgb.DMatrix(data=train.loc[:, features], label=train['target'])
dtest = xgb.DMatrix(data=test.loc[:, features], label=test['target'])
# get last finished run for hyperparameters tuning
last_tuning_run = mlflow.search_runs(
experiment_ids=[experiment_id],
filter_string=f"tags.mlflow.runName = 'Hyperparameters_Search' and status = 'FINISHED'",
order_by=["start_time DESC"]
).loc[0, :]
# get best params
params = {col.split('.')[1]: last_tuning_run[col] for col in last_tuning_run.index if 'params' in col}
params.update(eval_metric=['auc', 'error'])
mlflow.log_params(params)
model = xgb.train(
dtrain=dtrain,
num_boost_round=int(params["num_boost_round"]),
params=params,
evals=[(dtest, 'test')],
verbose_eval=False,
early_stopping_rounds=10
)
mlflow.log_metric("accuracy", 1 - model.best_score)
# Log model as Booster
input_example = test.loc[0:10, features]
predictions_example = pd.DataFrame(model.predict(xgb.DMatrix(input_example)), columns=['predictions'])
mlflow.xgboost.log_model(model, "booster", input_example=input_example)
mlflow.log_text(predictions_example.to_json(orient='split', index=False), 'booster/predictions_example.json')
# Register model
model_uri = f"runs:/{run.info.run_id}/booster"
mlflow.register_model(model_uri, 'CancerModelBooster')
# Log model as sklearn completable XGBClassifier
params.update(num_boost_round=model.best_iteration)
model = xgb.XGBClassifier(**params)
model.fit(train.loc[:, features], train['target'])
mlflow.xgboost.log_model(model, "model", input_example=input_example)
# log datasets
mlflow.log_text(train.to_csv(index=False), 'datasets/train.csv')
mlflow.log_text(test.to_csv(index=False),'datasets/test.csv')
logger.info('Model training finished')
# Register the model
model_uri = f"runs:/{run.info.run_id}/model"
mlflow.register_model(model_uri, 'CancerModel')
logger.info('Model registered')
mlflow run . --entry-point model-training --env-manager conda --experiment-name Cancer_Classification --run-name Model_Training -P n-trials=10
Благодаря функции mlflow.xgboost.autolog()
, все метрики автоматически логируются, в том числе в процессе обучения:
После сохранения модели, мы можем получить доступ к странице артефактов внутри запуска. В артефактах можно хранить любые типы файлов, такие как пользовательские графики, текстовые файлы, изображения, наборы данных, скрипты Python и другие. Например, я конвертировал рабочий ноутбук в HTML и сохранил его как артефакт:
Для каждой модели MLflow автоматически создаёт YAML конфигурационный файл, называемый MLmodel. Этот файл можно просмотреть в интерфейсе MLflow или скачать и изучить.
artifact_path: model
flavors:
python_function:
data: model.xgb
env:
conda: conda.yaml
virtualenv: python_env.yaml
loader_module: mlflow.xgboost
python_version: 3.11.4
xgboost:
code: null
data: model.xgb
model_class: xgboost.sklearn.XGBClassifier
model_format: xgb
xgb_version: 2.0.3
mlflow_version: 2.14.2
model_size_bytes: 35040
model_uuid: 516954aae7c94e91adeed9df76cb4052
run_id: bf212703d1874eee9dcb37c8a92a6de6
saved_input_example_info:
artifact_path: input_example.json
pandas_orient: split
type: dataframe
signature:
inputs: '[{"type": "double", "name": "mean radius", "required": true}, {"type":
"double", "name": "mean texture", "required": true}, {"type": "double", "name":
"mean perimeter", "required": true}, {"type": "double", "name": "mean area", "required":
true}, {"type": "double", "name": "mean smoothness", "required": true}, {"type":
"double", "name": "mean compactness", "required": true}, {"type": "double", "name":
"mean concavity", "required": true}, {"type": "double", "name": "mean concave
points", "required": true}, {"type": "double", "name": "mean symmetry", "required":
true}, {"type": "double", "name": "mean fractal dimension", "required": true},
{"type": "double", "name": "radius error", "required": true}, {"type": "double",
"name": "texture error", "required": true}, {"type": "double", "name": "perimeter
error", "required": true}, {"type": "double", "name": "area error", "required":
true}, {"type": "double", "name": "smoothness error", "required": true}, {"type":
"double", "name": "compactness error", "required": true}, {"type": "double", "name":
"concavity error", "required": true}, {"type": "double", "name": "concave points
error", "required": true}, {"type": "double", "name": "symmetry error", "required":
true}, {"type": "double", "name": "fractal dimension error", "required": true},
{"type": "double", "name": "worst radius", "required": true}, {"type": "double",
"name": "worst texture", "required": true}, {"type": "double", "name": "worst
perimeter", "required": true}, {"type": "double", "name": "worst area", "required":
true}, {"type": "double", "name": "worst smoothness", "required": true}, {"type":
"double", "name": "worst compactness", "required": true}, {"type": "double", "name":
"worst concavity", "required": true}, {"type": "double", "name": "worst concave
points", "required": true}, {"type": "double", "name": "worst symmetry", "required":
true}, {"type": "double", "name": "worst fractal dimension", "required": true},
{"type": "double", "name": "additional_feature", "required": true}]'
outputs: '[{"type": "long", "required": true}]'
params: null
utc_time_created: '2024-07-30 08:38:11.745511'
Файл MLmodel
содержит информациб по оберткам/вкусам (flavours), здесь это pyfunc и xgboost. Также он включает настройки окружения для conda (conda.yaml) и virtualenv (python_env.yaml). Модель является классификатором XGBoost, совместимым с API sklearn, сохранённым в формате XGBoost версии 2.0.3. Файл отслеживает такие детали, как размер модели, UUID, run ID и время создания. Также есть пример входных данных, для модели и её сигнатуроа - спецификация входа и выхода. Сигнатура может быть создана и сохранена вручную вместе с моделью, но MLflow автоматически генерирует сигнатуру при предоставлении примера входа.
В экосистеме MLflow, flavors (вкусы) — это обертки для конкретных библиотек машинного обучения, которые позволяют сохранять, логировать и извлекать модели в едином формате. Это обеспечивает единообразное поведение метода предсказания (predict) для различных фреймворков, упрощая управление и развертывание моделей.
Добавление предсказаний для примера входных данных может быть полезным, так как позволяет сразу протестировать модель после её загрузки и убедиться в её корректной работе в различных окружениях.
Мы сохранили две версии модели, обе с использованием xgboost, каждая из которых имеет два "flavors": python_function
и xgboost
. Разница заключается в классе модели: для бустера это xgboost.core.Booster
, а для модели это xgboost.sklearn.XGBClassifier
, который поддерживает API, совместимый с scikit-learn. Эти различия влияют на работу метода predict
, поэтому важно просмотреть файл MLmodel и проверить сигнатуру модели перед использованием. Также могут быть небольшие различия в производительности python_function
обычно немного медленее.
При загрузке модели бустера с использованием xgboost, модель ожидает, что входные данные будут в форме объекта DMatrix, и метод predict в нашем случае будет выдавать оценки (а не классы).
Built-in evaluation
Встроенная функция mlflow.evaluate
позволяет оценивать модели на дополнительных наборах данных:
import sys
import os
import argparse
import warnings
import logging
import mlflow
import pandas as pd
from loguru import logger
logger.remove()
logger.add(sys.stdout, format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
warnings.filterwarnings('ignore')
logging.getLogger('mlflow').setLevel(logging.ERROR)
if __name__ == '__main__':
logger.info('Evaluation started')
parser = argparse.ArgumentParser()
parser.add_argument("--eval-dataset", type=str)
eval_dataset = pd.read_csv(parser.parse_args().eval_dataset)
with mlflow.start_run() as run:
eval_dataset = mlflow.data.from_pandas(
eval_dataset, targets="target"
)
last_version = mlflow.MlflowClient().get_registered_model('CancerModel').latest_versions[0].version
mlflow.evaluate(
data=eval_dataset, model_type="classifier", model=f'models:/CancerModel/{last_version}'
)
logger.success('Evaluation finished')
mlflow run . --entry-point data-evaluation --env-manager conda --experiment-name Cancer_Classification --run-name Data_Evaluation -P eval-dataset='test.csv'
Результаты можно просмотреть в интерфейсе MLflow, где представлены различные метрики и графики, включая ROC-AUC, матрицы ошибок и графики SHAP (если SHAP установлен).
Model Serving
MLflow имеет встроенные возможности для деплоя моделей. Деплой модели с использованием Flask довольно просто сделать: mlflow models serve -m models:/CancerModel/1 --env-manager local
. Но мы также можем использовать mlserver
.
Пакет
mlserver
облегчает эффективное развертывание и обслуживание моделей машинного обучения с поддержкой множества фреймворков, используя интерфейсы REST и gRPC. Он интегрируется с Seldon Core для масштабируемого и надёжного управления моделями и мониторинга.
Возможно, вам потребуется установить следующие пакеты: mlserver
, mlserver-mlflow
, mlserver-xgboost
, если вы используете собственное окружение. После этого мы можем настроить конфигурационный файл (model-settings.json) для MLServer. Это позволяет нам изменить работу API; здесь мы просто настраиваем алиас для модели:
{
"name": "cancer-model",
"implementation": "mlserver_mlflow.MLflowRuntime",
"parameters": {
"uri": "models:/CancerModel/1"
}
}
Чтобы запустить MLServer в локальном окружении, можно использовать команду mlserver start .
Теперь у нас есть рабочий API с документацией OpenAPI, валидацией запросов, HTTP и gRPC серверами и эндпоинтом с метриками для prometheus. И всё это без написания кода, все что нужноя - простая конфигурацию в формате JSON.
Мы можем проверить документацию для нашей модели и изучить ожидаемую структуру данных через Swagger по адресу /v2/models/cancer/model/docs
Мы можем получить доступ к эндпоинту с метриками или настроить prometheus для их сбора
Теперь можно отправлять запросы к модели:
import requests
import json
url = "http://127.0.0.1:8080/invocations"
# convert df do split format and then to json
input_data = json.dumps({
"params": {
'method': 'proba',
},
'dataframe_split': {
"columns": test.columns.tolist(),
"data": test.values.tolist()
}
})
# Send a POST request to the MLflow model server
response = requests.post(url, data=input_data, headers={"Content-Type": "application/json"})
if response.status_code == 200:
prediction = response.json()
print("Prediction:", prediction)
else:
print("Error:", response.status_code, response.text)
Prediction: {'predictions': [1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0, 1, 1]}
Customize model
Мы можем настроить нашу модель для предоставления вероятностей или включения дополнительного логирования. Для этого сначала скачаем модель, а затем обернем её в пользовательскую обертку:
import mlflow
import mlflow.xgboost
import mlflow.pyfunc
# Step 1: Download the Existing Model from MLflow
model_uri = "models:/CancerModel/1"
model = mlflow.xgboost.load_model(model_uri)
# Step 2: Define the Custom PyFunc Model with `loguru` Setup in `load_context`
class CustomPyFuncModel(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def get_logger(self):
from loguru import logger
logger.remove()
logger.add("mlserve/mlserver_logs.log", format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}")
return logger
def load_context(self, context):
self.logger = self.get_logger()
def predict(self, context, model_input):
self.logger.info(f"start request")
self.logger.info(f"batch size: {len(model_input)}")
predict = self.model.predict_proba(model_input)[:,1]
self.logger.success(f"Finish request")
return predict
# Step 3: Save the Wrapped Model Back to MLflow
with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
artifact_path="custom_model",
python_model=CustomPyFuncModel(model),
registered_model_name="CustomCancerModel",
)
Теперь перепишем конфигурационный файл и заново запустим mlserver:
{
"name": "cancer-model",
"implementation": "mlserver_mlflow.MLflowRuntime",
"parameters": {
"uri": "models:/CustomCancerModel/1"
}
}
Отправим запросы к новому API:
# Send a POST request to the MLflow model server
response = requests.post(url, data=input_data, headers={"Content-Type": "application/json"})
if response.status_code == 200:
prediction = response.json()
print("Prediction:", prediction['predictions'][:10])
else:
print("Error:", response.status_code, response.text)
Prediction: [0.9406537413597107, 0.9998677968978882, 0.9991995692253113, 0.00043031785753555596, 0.9973010420799255, 0.9998010993003845, 0.9995433688163757, 0.9998323917388916, 0.0019207964651286602, 0.0004339178267400712]
Теперь мы получили, веротяности, также можно проверить что дополнительные логи записаны в файл:
2024-07-30 12:02:38 | INFO | start request
2024-07-30 12:02:38 | INFO | batch size: 188
2024-07-30 12:02:38 | SUCCESS | Finish request
Альтернативой этому подходу является написание нового API с нуля. Это может быть лучше, когда нам нужна большая гибкость, дополнительная функциональность или когда мы ограничены во времени, так как использование MLServer может быть немного медленнее.
В случае периодического переобучения модели нам нужно обновлять модель в сервисе или перезапускать деплой в открытой версии MLflow я не нашел такого функционала. Версия Databricks включает функцию вебхуков, которая позволяет MLflow уведомлять API о новых версиях. Другим вариантом является запуск деплоя при обновлении модели. Мы также можем открыть дополнительный эндпоинт в сервере и вызывать его в рамках DAG, или сконфигурирвовать сервер периодически запрашивать обновления в MLflow.
MLflow Tracking Server
MLflow Local Setup
До этого мы работали с MLflow локально: метаданные и артефакты хранятся в папке по умолчанию mlruns
. Вы можете проверить эту папку у себя, если успешно выполнили все предыдущие шаги. Мы можем изменить место хранения метаданных MLflow, указав другой backend-store-uri
при запуске команды MLflow UI. Например, чтобы использовать другую папку (mlruns_new
), выполните следующие действия: mlflow ui --backend-store-uri ./mlruns_new
И поменяйте tracking uri в проекте: mlflow.set_tracking_uri("file:./mlruns_new")
.
Remote Tracking
В продакшене мы обычно настраиваем удалённый сервер отслеживания с хранилищем артефактов и базой данных для метаданных MLflow. Мы можем симулировать эту конфигурацию, используя MinIO для хранения артефактов и PostgreSQL для базы данных. Вот простой файл Docker Compose с 4 сервсиами:
MinIO как s3 подобное хранилище
Клиент MinIO (minio/mc) для создания бакета для MLflow
PostgreSQL в качестве базы данных для метаинформации
MLflow UI
services:
s3:
image: minio/minio
container_name: mlflow_s3
ports:
- 9000:9000
- 9001:9001
command: server /data --console-address ':9001'
environment:
- MINIO_ROOT_USER=mlflow
- MINIO_ROOT_PASSWORD=password
volumes:
- minio_data:/data
init_s3:
image: minio/mc
depends_on:
- s3
entrypoint: >
/bin/sh -c "
until (/usr/bin/mc alias set minio http://s3:9000 mlflow password) do echo '...waiting...' && sleep 1; done;
/usr/bin/mc mb minio/mlflow;
exit 0;
"
postgres:
image: postgres:latest
ports:
- 5432:5432
environment:
- POSTGRES_USER=mlflow
- POSTGRES_PASSWORD=password
volumes:
- postgres_data:/var/lib/postgresql/data
mlflow:
build:
context: .
dockerfile: Dockerfile
ports:
- 5050:5000
environment:
- MLFLOW_S3_ENDPOINT_URL=http://s3:9000
- AWS_ACCESS_KEY_ID=mlflow
- AWS_SECRET_ACCESS_KEY=password
command: >
mlflow server
--backend-store-uri postgresql://mlflow:password@postgres:5432/mlflow
--default-artifact-root s3://mlflow/
--artifacts-destination s3://mlflow/
--host 0.0.0.0
depends_on:
- s3
- postgres
- init_s3
volumes:
postgres_data:
minio_data:
Вызовете эти команды для создания имэджа и запуска контенеров docker compose -f tracking_server/docker-compose.yml build
and docker compose -f tracking_server/docker-compose.yml up
.
Вместо обработки артефактов MLflow предоставляет ссылку клиенту, позволяя сохранять и загружать артефакты напрямую из хранилища артефактов. Поэтому вам нужно настроить ключи доступа и хост сервера отслеживания, чтобы правильно логировать артефакты.
import os
import mlflow
os.environ['AWS_ACCESS_KEY_ID'] = 'mlflow'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'password'
os.environ['MLFLOW_TRACKING_URI'] = 'http://localhost:5050'
os.environ['MLFLOW_S3_ENDPOINT_URL'] = 'http://localhost:9000'
# run data preprocessing step one more time
mlflow.run(
uri = '.',
entry_point = 'data-preprocessing',
env_manager='local',
experiment_name='Cancer_Classification',
run_name='Data_Preprocessing',
parameters={'test-size': 0.5},
)
После этого шага мы можем зайти в MLflow UI, чтобы убедиться, что запуск и артефакты были успешно зарегистрированы:
И убедится через интерфейс MinIO, что артефакты были успешно сохранены в бакете:
И также выполнить запрос к нашей базе данных PostgreSQL, чтобы убедиться, что она используется для хранения метаданных:
import psycopg2
import pandas as pd
conn = psycopg2.connect(dbname='mlflow', user='mlflow', password='password', host='localhost', port='5432')
try:
query = "SELECT experiment_id, name FROM experiments"
experiments_df = pd.read_sql(query, conn)
except Exception as e:
print(e)
else:
print(experiments_df)
finally:
conn.close()
В продакшене может потребоваться аутентификация, разные уровни доступа пользователей и мониторинг для MLflow, бакета, базы данных и конкретных артефактов. Мы не будем охватывать эти аспекты здесь, и некоторые из них не имеют встроенной поддержки в MLflow.
Мы рассмотрели как можно использовать MLflow как локально для себя, так и для совасместной работы над проектами, кратко прошлись по основному функционалу и использовали его на примерах. Вы можете посмотреть полный код в этом репозитории, также можно посомтреть документацию Mlflow.