Как правило, когда нужно что-то сделать быстро и дёшево, мы не задумываемся над отказоустойчивостью и масштабируемостью нашего приложения, что через некоторое время обязательно приводит к боли. Современные решения позволяют быстро и просто решить эту проблему.
На примере перехода от монолитного приложения к микросервисам, я попробую показать все плюсы и минусы каждого подхода. Статья разделена на три части:
- В первой части будет рассмотрено монолитное приложение на веб-фреймворке Dash, т.е. генерация данных и их отображение будут находиться в одном месте.
- Вторая часть посвящена разложению монолитного приложения на микросервисы, т.е. генерацией данных будет заниматься один сервис, отображением другой, а связь между ними будет налажена через брокер сообщений Kafka.
- В третьей части микросервисы будут "упакованы" в Docker контейнеры.
Конечное приложение будет выглядеть, как показано на диаграмме снизу.

Введение
Для того чтобы лучше понять пример, желательно иметь хотя бы базовые знания в Kafka и Docker, приведу несколько на мой взгляд дельных курсов и статей:
- По Kafka, очень подробно разобрано на youtube канале Stephane Maarek, канал на английском языке.
- Про Docker мне понравился плейлист с youtube канала letsCode на русском языке.
- Статья на хабре Просто о микросервисах от YuryKa.
Полный код проекта залит на github, можно скачать отсюда.
Часть 1. Монолитное приложение
Для примера монолитного приложения я взял код из официальной документации по Dash (Plotly), посмотреть его можно здесь, в исходном коде проекта находится в папке local_app. Это идеальное решение для быстрого прототипирования.
import datetime
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output
# pip install pyorbital
from pyorbital.orbital import Orbital
satellite = Orbital('TERRA')
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H4('TERRA Satellite Live Feed'),
html.Div(id='live-update-text'),
dcc.Graph(id='live-update-graph'),
dcc.Interval(
id='interval-component',
interval=1*1000, # in milliseconds
n_intervals=0
)
])
)
@app.callback(Output('live-update-text', 'children'),
[Input('interval-component', 'n_intervals')])
def update_metrics(n):
lon, lat, alt = satellite.get_lonlatalt(datetime.datetime.now())
style = {'padding': '5px', 'fontSize': '16px'}
return [
html.Span('Longitude: {0:.2f}'.format(lon), style=style),
html.Span('Latitude: {0:.2f}'.format(lat), style=style),
html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
]
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
[Input('interval-component', 'n_intervals')])
def update_graph_live(n):
satellite = Orbital('TERRA')
data = {
'time': [],
'Latitude': [],
'Longitude': [],
'Altitude': []
}
# Collect some data
for i in range(180):
time = datetime.datetime.now() - datetime.timedelta(seconds=i*20)
lon, lat, alt = satellite.get_lonlatalt(
time
)
data['Longitude'].append(lon)
data['Latitude'].append(lat)
data['Altitude'].append(alt)
data['time'].append(time)
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
fig.append_trace({
'x': data['time'],
'y': data['Altitude'],
'name': 'Altitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 1, 1)
fig.append_trace({
'x': data['Longitude'],
'y': data['Latitude'],
'text': data['time'],
'name': 'Longitude vs Latitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 2, 1)
return fig
if __name__ == '__main__':
app.run_server(debug=True)Приложение каждую секунду обновляет графики и таким образом эмулируется реал-тайм поступление данных. В качестве источника данных используется python пакет pyorbital, с помощью которого можно производить различные астрономические вычисления (в этом примере рассчитывается положение научно-исследовательского спутника Terra (EOS AM-1)). Полученные данные и графики отображаются в браузере через Dash (Plotly) на локальном хосте: 127.0.0.1:8050.
Данные, которые получаются на выходе — это altitude, longitude и latitude (высота, долгота и широта), т.е. координаты спутника в текущий момент времени, первый график показывает изменение высоты, а второй — долготы и широты соответственно.

(На рисунке показана работа монолитного (оригинального) приложения)
К плюсам можно отнести:
- Высокая скорость работы, данные вычисляются и сразу же обновляются.
К минусам:
- Высокая стоимость ошибки, например, если ошибка возникнет при вычислении данных, то и их отображение тоже пострадает, т.к. это повлечет падение всего приложения (хотя конечно можно написать обработчик ошибок, но это усложнит код).
- Невозможность обновить/исправить приложение на лету. Если, например, необходимо изменить алгоритм вычисления данных, необходимо будет перезапустить всё, в том числе и отображение.
Часть 2. Приложение на основе микросервисов
В исходном коде проекта находится в папке local_microservices_app, для удобства туда же я положил сервис Kafka упакованный в Docker, исходный код которого можно посмотреть здесь (взято с github репозитория Stephane Maarek)
Монолитное приложение я разделил на две части, первая часть — backend (producer.py), генерирует данные и отправляет их в Kafka, вторая часть — frontend (consumer.py, graph_display.py) читает сообщения из Kafka и отображает графики в браузере.
backend:
Producer (производитель данных) вычисляет данные и отправляет их в Kafka раз в одну секунду (в оригинальном приложении данные рассчитывались за каждые 20 секунд)
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital
satellite = Orbital('TERRA')
topic = 'test_topic'
producer = Producer({'bootstrap.servers': 'localhost:9092'})
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
# send data every one second
while True:
time = datetime.datetime.now()
lon, lat, alt = satellite.get_lonlatalt(time)
record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})
producer.produce(topic, key=None, value=record_value, on_delivery=acked)
producer.poll()
sleep(1)
frontend:
Consumer (потребитель данных) написан в виде класса MyKafkaConnect и находится в файле consumer.py, при инициализации загружает из Kafka последние 180 (или меньше, если их не достаточно) сообщений. При последующих обращениях докачивает все новые сообщения из Kafka.
Изначальное монолитное приложение (monolith.py) не сильно изменилось, ключевое изменение состоит в том, что данные рассчитываются не на месте, а загружаются через класс MyKafkaConnect, все прежние методы работают фактически также.
import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep
class MyKafkaConnect:
def __init__(self, topic, group, que_len=180):
self.topic = topic
self.conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': group,
'enable.auto.commit': True,
}
# the application needs a maximum of 180 data units
self.data = {
'time': deque(maxlen=que_len),
'Latitude': deque(maxlen=que_len),
'Longitude': deque(maxlen=que_len),
'Altitude': deque(maxlen=que_len)
}
consumer = Consumer(self.conf)
consumer.subscribe([self.topic])
# download first 180 messges
self.partition = TopicPartition(topic=self.topic, partition=0)
low_offset, high_offset = consumer.get_watermark_offsets(self.partition)
# move offset back on 180 messages
if high_offset > que_len:
self.partition.offset = high_offset - que_len
else:
self.partition.offset = low_offset
# set the moved offset to consumer
consumer.assign([self.partition])
self.__update_que(consumer)
# https://docs.confluent.io/current/clients/python.html#delivery-guarantees
def __update_que(self, consumer):
try:
while True:
msg = consumer.poll(timeout=0.1)
if msg is None:
break
elif msg.error():
print('error: {}'.format(msg.error()))
break
else:
record_value = msg.value()
json_data = json.loads(record_value.decode('utf-8'))
self.data['Longitude'].append(json_data['lon'])
self.data['Latitude'].append(json_data['lat'])
self.data['Altitude'].append(json_data['alt'])
self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))
# save local offset
self.partition.offset += 1
finally:
# Close down consumer to commit final offsets.
# It may take some time, that why I save offset locally
consumer.close()
def get_graph_data(self):
consumer = Consumer(self.conf)
consumer.subscribe([self.topic])
# update low and high offsets (don't work without it)
consumer.get_watermark_offsets(self.partition)
# set local offset
consumer.assign([self.partition])
self.__update_que(consumer)
# convert data to compatible format
o = {key: list(value) for key, value in self.data.items()}
return o
def get_last(self):
lon = self.data['Longitude'][-1]
lat = self.data['Latitude'][-1]
alt = self.data['Altitude'][-1]
return lon, lat, alt
# for test
if __name__ == '__main__':
connect = MyKafkaConnect(topic='test_topic', group='test_group')
while True:
test = connect.get_graph_data()
print('number of messages:', len(test['time']),
'unique:', len(set(test['time'])),
'time:', test['time'][-1].second)
sleep(0.1)
import datetime
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output
from consumer import MyKafkaConnect
connect = MyKafkaConnect(topic='test_topic', group='test_group')
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.H4('TERRA Satellite Live Feed'),
html.Div(id='live-update-text'),
dcc.Graph(id='live-update-graph'),
dcc.Interval(
id='interval-component',
interval=1*1000, # in milliseconds
n_intervals=0
)
])
)
@app.callback(Output('live-update-text', 'children'),
[Input('interval-component', 'n_intervals')])
def update_metrics(n):
lon, lat, alt = connect.get_last()
print('update metrics')
style = {'padding': '5px', 'fontSize': '16px'}
return [
html.Span('Longitude: {0:.2f}'.format(lon), style=style),
html.Span('Latitude: {0:.2f}'.format(lat), style=style),
html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
]
# Multiple components can update everytime interval gets fired.
@app.callback(Output('live-update-graph', 'figure'),
[Input('interval-component', 'n_intervals')])
def update_graph_live(n):
# Collect some data
data = connect.get_graph_data()
print('Update graph, data units:', len(data['time']))
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
fig.append_trace({
'x': data['time'],
'y': data['Altitude'],
'name': 'Altitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 1, 1)
fig.append_trace({
'x': data['Longitude'],
'y': data['Latitude'],
'text': data['time'],
'name': 'Longitude vs Latitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 2, 1)
return fig
if __name__ == '__main__':
app.run_server(debug=True)
К плюсам можно отнести:
- Меньше негативных последствий от ошибок, если ошибка возникнет при вычислении данных в backend микросервисе, то в таком случае отображение графиков продолжится, хотя конечно обновляться они не будут (обратите внимание, графики могут быть лишь не большим модулем большого приложения, которое продолжит работу).
- Возможность обновления/исправления ошибок на лету, т.к. модули работают независимо, то кратковременная остановка backend микросервиса не вызовет падения отображения графиков, хотя и будет небольшой лаг.
- Микросервис может быть написан на любом языке программирования.
К минусам:
- Меньшая скорость работы по сравнению с монолитным приложением, между вычислением данных и их отображением появляется посредник в виде Kafka, на работу которого требуется хоть и не большое, но время.

(На рисунке сверху backend после обновления отправляет вдвое больше сообщений, что видно на графике)
Часть 3. Приложение на основе микросервисов упакованных в Docker
В исходном коде проекта находится в папке docker_microservices_app. От второй части отличается тем, что backend и frontend упакованы в Docker. Также я добавил ещё два микросервиса в backend (ещё два научно-исследовательских спутника Aura (EOS CH-1) и Aqua (EOS PM-1)).
FROM python:3.7
RUN python -m pip install confluent-kafka
RUN python -m pip install pyorbital
WORKDIR /app
COPY producer.py ./
CMD ["python", "producer.py"]
FROM python:3.7
RUN python -m pip install confluent-kafka
RUN python -m pip install dash plotly
WORKDIR /app
COPY consumer.py graph_display.py ./
CMD ["python", "graph_display.py"]
version: '2.1'
# Stephane Maarek's kafka-docker
# https://github.com/simplesteph/kafka-stack-docker-compose/blob/master/zk-single-kafka-single.yml
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
restart: unless-stopped
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888
volumes:
- ./zk-single-kafka-single/zoo1/data:/data
- ./zk-single-kafka-single/zoo1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.5.0
hostname: kafka1
ports:
- "9092:9092"
restart: unless-stopped
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zoo1
backend_terra:
build:
context: ./backend
restart: unless-stopped
environment:
BOOTSTRAP_SERVERS: "kafka1:19092"
TOPIC: "terra_topic"
SATELLITE: "TERRA"
depends_on:
- kafka1
backend_aqua:
build:
context: ./backend
restart: unless-stopped
environment:
BOOTSTRAP_SERVERS: "kafka1:19092"
TOPIC: "aqua_topic"
SATELLITE: "AQUA"
depends_on:
- kafka1
backend_aura:
build:
context: ./backend
restart: unless-stopped
environment:
BOOTSTRAP_SERVERS: "kafka1:19092"
TOPIC: "aura_topic"
SATELLITE: "AURA"
depends_on:
- kafka1
frontend:
build:
context: ./frontend
ports:
- "8050:8050"
restart: unless-stopped
environment:
BOOTSTRAP_SERVERS: "kafka1:19092"
depends_on:
- backend_terra
- backend_aqua
- backend_aura
from time import sleep
import datetime
from confluent_kafka import Producer
import json
from pyorbital.orbital import Orbital
import os
topic = os.environ['TOPIC']
bootstrap_servers = os.environ['BOOTSTRAP_SERVERS']
s_name = os.environ['SATELLITE']
satellite = Orbital(s_name)
producer = Producer({'bootstrap.servers': bootstrap_servers})
def acked(err, msg):
if err is not None:
print("Failed to deliver message: {}".format(err))
else:
print("Produced record to topic {} partition [{}] @ offset {}"
.format(msg.topic(), msg.partition(), msg.offset()))
# send data every one second
while True:
time = datetime.datetime.now()
lon, lat, alt = satellite.get_lonlatalt(time)
record_value = json.dumps({'lon':lon, 'lat': lat, 'alt': alt, 'time': str(time)})
producer.produce(topic, key=None, value=record_value, on_delivery=acked)
producer.poll()
sleep(1)import datetime
from confluent_kafka import Consumer, TopicPartition
import json
from collections import deque
from time import sleep
class MyKafkaConnect:
def __init__(self, topic, group, que_len=180):
self.topic = topic
self.conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': group,
'enable.auto.commit': True,
}
# the application needs a maximum of 180 data units
self.data = {
'time': deque(maxlen=que_len),
'Latitude': deque(maxlen=que_len),
'Longitude': deque(maxlen=que_len),
'Altitude': deque(maxlen=que_len)
}
consumer = Consumer(self.conf)
consumer.subscribe([self.topic])
# download first 180 messges
self.partition = TopicPartition(topic=self.topic, partition=0)
low_offset, high_offset = consumer.get_watermark_offsets(self.partition)
# move offset back on 180 messages
if high_offset > que_len:
self.partition.offset = high_offset - que_len
else:
self.partition.offset = low_offset
# set the moved offset to consumer
consumer.assign([self.partition])
self.__update_que(consumer)
# https://docs.confluent.io/current/clients/python.html#delivery-guarantees
def __update_que(self, consumer):
try:
while True:
msg = consumer.poll(timeout=0.1)
if msg is None:
break
elif msg.error():
print('error: {}'.format(msg.error()))
break
else:
record_value = msg.value()
json_data = json.loads(record_value.decode('utf-8'))
self.data['Longitude'].append(json_data['lon'])
self.data['Latitude'].append(json_data['lat'])
self.data['Altitude'].append(json_data['alt'])
self.data['time'].append(datetime.datetime.strptime(json_data['time'], '%Y-%m-%d %H:%M:%S.%f'))
# save local offset
self.partition.offset += 1
finally:
# Close down consumer to commit final offsets.
# It may take some time, that why I save offset locally
consumer.close()
def get_graph_data(self):
consumer = Consumer(self.conf)
consumer.subscribe([self.topic])
# update low and high offsets (don't work without it)
consumer.get_watermark_offsets(self.partition)
# set local offset
consumer.assign([self.partition])
self.__update_que(consumer)
# convert data to compatible format
o = {key: list(value) for key, value in self.data.items()}
return o
def get_last(self):
lon = self.data['Longitude'][-1]
lat = self.data['Latitude'][-1]
alt = self.data['Altitude'][-1]
return lon, lat, alt
# for test
if __name__ == '__main__':
connect = MyKafkaConnect(topic='test_topic', group='test_group')
while True:
test = connect.get_graph_data()
print('number of messages:', len(test['time']),
'unique:', len(set(test['time'])),
'time:', test['time'][-1].second)
sleep(0.1)import datetime
import dash
import dash_core_components as dcc
import dash_html_components as html
import plotly
from dash.dependencies import Input, Output
from consumer import MyKafkaConnect
external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
app = dash.Dash(__name__, external_stylesheets=external_stylesheets)
app.layout = html.Div(
html.Div([
html.Div([
html.H4('TERRA Satellite Live Feed'),
html.Div(id='terra-text'),
dcc.Graph(id='terra-graph')
], className="four columns"),
html.Div([
html.H4('AQUA Satellite Live Feed'),
html.Div(id='aqua-text'),
dcc.Graph(id='aqua-graph')
], className="four columns"),
html.Div([
html.H4('AURA Satellite Live Feed'),
html.Div(id='aura-text'),
dcc.Graph(id='aura-graph')
], className="four columns"),
dcc.Interval(
id='interval-component',
interval=1*1000, # in milliseconds
n_intervals=0
)
], className="row")
)
def create_graphs(topic, live_update_text, live_update_graph):
connect = MyKafkaConnect(topic=topic, group='test_group')
@app.callback(Output(live_update_text, 'children'),
[Input('interval-component', 'n_intervals')])
def update_metrics_terra(n):
lon, lat, alt = connect.get_last()
print('update metrics')
style = {'padding': '5px', 'fontSize': '15px'}
return [
html.Span('Longitude: {0:.2f}'.format(lon), style=style),
html.Span('Latitude: {0:.2f}'.format(lat), style=style),
html.Span('Altitude: {0:0.2f}'.format(alt), style=style)
]
# Multiple components can update everytime interval gets fired.
@app.callback(Output(live_update_graph, 'figure'),
[Input('interval-component', 'n_intervals')])
def update_graph_live_terra(n):
# Collect some data
data = connect.get_graph_data()
print('Update graph, data units:', len(data['time']))
# Create the graph with subplots
fig = plotly.tools.make_subplots(rows=2, cols=1, vertical_spacing=0.2)
fig['layout']['margin'] = {
'l': 30, 'r': 10, 'b': 30, 't': 10
}
fig['layout']['legend'] = {'x': 0, 'y': 1, 'xanchor': 'left'}
fig.append_trace({
'x': data['time'],
'y': data['Altitude'],
'name': 'Altitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 1, 1)
fig.append_trace({
'x': data['Longitude'],
'y': data['Latitude'],
'text': data['time'],
'name': 'Longitude vs Latitude',
'mode': 'lines+markers',
'type': 'scatter'
}, 2, 1)
return fig
create_graphs('terra_topic', 'terra-text', 'terra-graph')
create_graphs('aqua_topic', 'aqua-text', 'aqua-graph')
create_graphs('aura_topic', 'aura-text', 'aura-graph')
if __name__ == '__main__':
app.run_server(
host='0.0.0.0',
port=8050,
debug=True)К плюсам можно отнести (кроме тех, что указаны во второй части):
- Изоляция ресурсов, отсутствие конфликтов библиотек различных версий и т.д. (каждый модуль приложения можно упаковать в отдельный Docker-контейнер со всем своим окружением и зависимостями).
- Быстрый и удобный запуск приложения на любом хосте.
- Лёгкая масштабируемость приложения, легко можно добавить новые контейнеры
К минусам:
- Т.к. Docker является дополнительным посредником между ОС и приложением, это приводит к увеличению нагрузки и расходу большего количества ресурсов.
На рисунке ниже показана ситуация, при которой один backend микросервис перестал отправлять данные и так как всё приложение состоит из микросервисов, то оно продолжает работать (в отличие от монолитной конструкции), более того, когда этот микросервис возобновил работу, он подключился на лету, и не потребовалось всё перезапускать.

(Не смотря на то, что один микросервис завис, остальные продолжают работать)
Выводы
В данной статье я рассмотрел, на мой взгляд наиболее очевидные (далеко не все) достоинства и недостатки каждого подхода. Кроме того, хоть внедрение Kafka и кажется дополнительным усложнением проекта, но на самом деле задачи, которые необходимо решить обычно являются типовыми, такими как непрерывное чтение данных из бд или их запись, слив данных из нескольких бд в одну и т.д. Для подобных целей нет необходимости изобретать велосипед, можно использовать готовые решения из Kafka connectors, там есть поддержка для фактически всех хоть сколько-нибудь известных бд.
Дополнительные ссылки по теме:
Python + Kafka =? / Николай Сасковец/ bitnet [Python Meetup 14.09.2019]
Николай Сасковец, Построение микросервисных систем с использованием Kafka
jonSina
Я надеюсь вы знаете что внутри образа cp-kafka:5.5.0 прописан хитрый mount (можете посмотреть через инспект) и ваши монтировки в композе не верны.
DimaFromMai Автор
Кафку взял отсюда, это репозиторий от Stephane Maarek, у него ещё есть youtube канал по Kafka (правда он на английском языке), также ещё на udemy есть его аккаунт, там тоже есть курсы по Kafka, но они платные.