Если кто-то не в курсе, Nautobot - форк Netbox, его продвигает широко известный в узких кругах провайдер сетевой автоматизации NTC (Network to code). Возможно, порывшись в памяти, вы вспомните не очень красивую историю начала прошлого года про затирание истории коммитов Netbox - дело как раз касалось Nautobot и NTC. Но речь пойдет не о правилах приличия опенсорсного сообщества, а о том, как легко и непринужденно написать свой плагин тому, кто выбрал в качестве источника истины и платформы автоматизации Nautobot, а не Netbox. Хотя я уверен, что реализация плагина, о котором пойдет речь дальше, так же легко может быть выполнена и для Netbox.
Постановка задачи
В нашей не очень большой сети имеется ~50 филиалов, в каждом из которых установлены коммутаторы с общим количество портов от 300 до 500. На самом деле филиалов больше, но начнем мы именно с этого количества, потому что во всех из них установлено оборудование Cisco. Это важно, так как наш плагин будет пока одновендорным.
По каждому порту каждого коммутатора нам необходимо знать:
когда последний раз было подключение к этому порту
устройство с каким MAC/IP адресом и в каком VLAN подключено к этому порту
“Постой-ка, автор” - скажете вы - “такой функционал я уже где-то видел”. Совершенно точно, это switchmap (гуглится по словосочетанию cisco switchmap). Что ж, никто не обещал, что в статье будет совершенно новая идея для плагина. Но эта идея очень хорошо ложится в концепцию использования Nautobot как платформы автоматизации и SoT.
Сначала я хотел отдать дань уважения разработчикам switchmap и увековечить их творение в названии своего плагина. Но в итоге остановился на более объясняющем функционал названии - nautobot-porthistory-plugin (Sorry guys)
Стек
Ну тут всё просто. Раз мы используем Nautobot, будем писать на Python и активно использовать модули Django. Никаких внешних скриптов, все внутри Nautobot. Забирать данные с сетевых устройств будем по SNMP. Коммутаторы и маршрутизаторы - Cisco, все заведены в Nautobot, у каждого устройства учитываются primary address и все интерфейсы, для каждого филиала учитывается список префиксов и VLAN.
Что насчет документации?
У Nautobot замечательная документация. Еще бы, ведь она почти полностью скопирована у Netbox. Но все изменения и отличия от Netbox документируются не менее хорошо, за это разработчикам отдельное спасибо. Раздел про разработку плагинов достаточно большой - https://nautobot.readthedocs.io/en/stable/plugins/development/
Но вот только первое прочтение официально туториала не дало ответы на многие вопросы (впрочем, как и последующие). Многие знания пришлось выдергивать из исходного кода самого Nautobot, а также плагинов, коих становится все больше и больше.
Итак, начнем
В соответствии поставленной задаче разделим разработку плагина на две части. В первой части сделаем что полегче - научим плагин определять время последнего использования портов коммутатора, хранить и выводить эту информацию пользователям.
По окончании этого этапа ожидается, что наш плагин выполнит следующее:
создаст в БД таблицу для хранения информации о неиспользуемых интерфейсах;
создаст job, который будет с нужной нам периодичностью заполнять созданную таблицу;
изменит страницу вывода информации о коммутаторе так, чтобы было видно, какие порты не используются больше заданного количества дней.
Пока действуем строго по документации
Создадим в каталоге nautobot-plugin-porthistory скелет плагина на сервере (работаем под пользователем nautobot).
Nautobot includes a command to help create the plugin directory: nautobot-server startplugin [app_name]
Команда nautobot-server startplugin nautobot_porthistory_plugin
создаст следующий набор папок и файлов:
.
└── nautobot_porthistory_plugin
├── __init__.py
├── migrations
│ └── __init__.py
├── models.py
├── navigation.py
├── tests
│ ├── __init__.py
│ ├── test_models.py
│ └── test_views.py
├── urls.py
└── views.py
Ну что, файлы есть, давайте наполним их работающим кодом.
Файл пакета __init__.py уже содержит большинство нужных параметров. Всё, что нам требуется, указать какие опции конфига обязательны для заполнения, а также настройки плагина по умолчанию.
__init__.py
"""nautobot_porthistory_plugin Plugin Initilization."""
from nautobot.extras.plugins import PluginConfig
class NautobotPorthistoryPluginConfig(PluginConfig):
"""Plugin configuration for the nautobot_porthistory_plugin plugin."""
name = "nautobot_porthistory_plugin" # Raw plugin name; same as the plugin's source directory.
verbose_name = "nautobot_porthistory_plugin" # Human-friendly name for the plugin.
base_url = "nautobot_porthistory_plugin" # (Optional) Base path to use for plugin URLs. Defaulting to app_name.
required_settings = [] # A list of any configuration parameters that must be defined by the user.
min_version = "1.0.0" # Minimum version of Nautobot with which the plugin is compatible.
max_version = "1.999" # Maximum version of Nautobot with which the plugin is compatible.
default_settings = {} # A dictionary of configuration parameters and their default values.
caching_config = {} # Plugin-specific cache configuration.
# А вот и нужная нам конфигурация
required_settings = ['switches_role_slug']
default_settings = {
'min_idle_days': 14,
'snmp_community': 'public',
'workers': 50,
}
config = NautobotPorthistoryPluginConfig
Параметры, которые будем передавать в плагин:
switches_role_slug
- Роль, по которой будем фильтровать коммутаторыmin_idle_days
- Если порт не используется меньше дней, он нас не интересуетworkers
- количество параллельных асинхронных запросов к оборудованию
Модель определим в models.py. В таблице UnusedPorts будем хранить время обновления, время последнего использования интерфейса и собственно ID самого интерфейса. Как видите, пока вообще ничего сложного.
models.py
"""Model definition for nautobot_porthistory_plugin."""
from django.db import models
from nautobot.core.models import BaseModel
from nautobot.dcim.fields import MACAddressCharField
class UnusedPorts(BaseModel):
# Дата/время последнего output на порту коммутатора
updated = models.DateTimeField(auto_now=True)
last_output = models.DateTimeField()
interface = models.ForeignKey(
to="dcim.Interface",
on_delete=models.CASCADE,
blank=False,
)
def __str__(self):
return f'{self.interface.name} - {self.last_output}'
Аргумент auto_now=True
указывает, что при каждом сохранении данных в таблицу поле updated автоматически обновляется до текущего времени
Файлы navigation.py, urls.py и views.py пока нам не нужны, оставим их в первозданном виде. Эти три файла понадобятся на следующем этапе, когда мы сделаем отдельную ссылку на плагин в меню. Но, как обычно, есть нюанс. Импорты в views.py “из коробки” ссылаются на несуществующие модули django, поэтому чтобы не словить исключение при установке, закомментим строчку from django.views.generic import views
Итак, у нас в БД есть таблица, но в ней нет данных. Создадим джоб (job в Nautobot - как custom script и report в Netbox), который соберет информацию с коммутаторов, обработает ее и сохранит в нашей таблице.
Добавим файл jobs.py с классом (джобом) UnusedPortsUpdate
Логика его работы проста:
Сгенерировать список коммутаторов - используем фильтр по device_role.slug = switches_role_slug из конфига
Асинхронно опросить все коммутаторы по SNMP - uptime устройства
Асинхронно опросить все коммутаторы по SNMP - соответствие ifindex и названий портов
Асинхронно опросить все коммутаторы по SNMP - Last output на портах
Если Last output отрицательный смотрим, есть ли в таблице информация по данному порту. Если отсутствует - добавляем дату последнего использования равной дате загрузке (п.2). Если данные есть, просто обновляем дату проверки
Если Last output, переведенный в дни, меньше, чем min_idle_days из конфига, удаляем запись о порте из таблицы. Иначе создаем/обновляем запись.
На всякий случай в необязательные входные параметры добавим Site, чтобы иметь возможность обновлять информацию по конкретному филиалу.
Код jobs.py
jobs.py
from nautobot.dcim.models import Device, DeviceRole, Site, Interface
from nautobot.extras.jobs import Job, ObjectVar
from nautobot.extras.models import Status
from django.conf import settings
from nautobot_porthistory_plugin.models import UnusedPorts
import asyncio
import aiosnmp
from collections import defaultdict
from netutils.interface import canonical_interface_name
from datetime import datetime, timedelta
class UnusedPortsUpdate(Job):
class Meta:
name = "Обновление информации о неподключенных интерфейсах"
site = ObjectVar(
model=Site,
label='БЮ',
required=False
)
async def bulk_snmp(self, device, oid_list, community):
oid_results = {}
try:
async with aiosnmp.Snmp(
host=device,
port=161,
community=community,
timeout=5,
retries=3,
max_repetitions=10,
) as snmp:
oid_bulk_result = {}
for oid in oid_list:
reply = await snmp.bulk_walk(oid)
for index in reply:
oid_bulk_result[index.oid] = index.value
oid_results[oid] = oid_bulk_result
return (device, oid_results)
except Exception as error:
return (device, error)
return (device, None)
async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
async with semaphore:
return await function(*args, **kwargs)
async def async_bulk_snmp(self, devices, oid_list, community, workers):
semaphore = asyncio.Semaphore(workers)
coroutines = [
self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
for device in devices
]
result = []
for future in asyncio.as_completed(coroutines):
result.append(await future)
return result
def round_datetime(self, date):
date_tuple = date.timetuple()
return datetime(year=date_tuple.tm_year,
month=date_tuple.tm_mon,
day=date_tuple.tm_mon,
hour=date_tuple.tm_hour, minute=0, second=0, microsecond=0
)
def run(self, data, commit):
# запускать job могут только пользователи is_superuser
if not self.request.user.is_superuser:
self.log_info(message='Неавторизованный запуск')
return
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
COMMUNITY = PLUGIN_CFG['snmp_community']
MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
WORKERS = PLUGIN_CFG['workers']
STATUS_ACTIVE = Status.objects.get(slug='active')
# сгенерируем справочник устройств
devices = [] #этот список передадим в модуль snmp
device_dict = defaultdict(dict)
device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
if data['site']:
nb_devices = Device.objects.filter(site=data['site'], device_role__in=device_role, status=STATUS_ACTIVE)
else:
nb_devices = Device.objects.filter(device_role__in=device_role, status=STATUS_ACTIVE)
for nb_device in nb_devices:
if nb_device.platform and nb_device.platform.napalm_driver and nb_device.platform.napalm_driver == 'cisco_iosxe' and nb_device.primary_ip4:
primary_ip = str(nb_device.primary_ip4).split('/')[0]
devices.append(primary_ip)
device_dict[primary_ip]['device'] = nb_device
device_dict[primary_ip]['interfaces'] = {}
device_dict[primary_ip]['ifindexes'] = {}
device_interfaces = Interface.objects.filter(device_id=nb_device)
for intf in device_interfaces:
device_dict[primary_ip]['interfaces'][intf.name] = [intf]
# получим uptime оборудования по SNMP (в секундах)
# и занесем эту информацию в справочник
oid_list = ['.1.3.6.1.6.3.10.2.1.3']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
self.log_warning(obj=device_dict[device_ip]['device'],message=f'не удалось получить информацию по SNMP - {device_result}')
continue
for oid, oid_result in device_result.items():
for uptime in oid_result.values():
device_dict[device_ip]['uptime'] = uptime
boottime = datetime.now() - timedelta(seconds=uptime)
device_dict[device_ip]['boottime'] = boottime
# получим названия интерфейсов и их индексы с оборудования по SNMP
# и занесем эту информацию в справочник
oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
ifindex = index.split('.')[-1]
canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
if canonical_intf_name in device_dict[device_ip]['interfaces']:
device_dict[device_ip]['ifindexes'][ifindex] = canonical_intf_name
# получим время последнего output по SNMP
oid_list = ['.1.3.6.1.4.1.9.2.2.1.1.4']
results = asyncio.run(self.async_bulk_snmp(devices, oid_list, COMMUNITY, WORKERS))
output = ''
for device_ip, device_result in results:
if type(device_result) != dict or 'uptime' not in device_dict[device_ip]:
continue
nb_device = device_dict[device_ip]['device']
boottime = device_dict[device_ip]['boottime']
uptime = device_dict[device_ip]['uptime']
output += f'{nb_device.name} - power on {boottime}\n'
unused_port_count = 0
for oid, oid_result in device_result.items():
for index, time_from_last_output in oid_result.items():
ifindex = index.split('.')[-1]
if ifindex in device_dict[device_ip]['ifindexes']:
intf_name = device_dict[device_ip]['ifindexes'][ifindex]
nb_interface = device_dict[device_ip]['interfaces'][intf_name][0]
if time_from_last_output < 0 or time_from_last_output / 1000 > uptime - 300:
unused_port_count += 1
unused_port, created = UnusedPorts.objects.get_or_create(
interface=nb_interface,
defaults={
'last_output': boottime
}
)
unused_port.save()
else:
last_output = datetime.now() - timedelta(seconds=round(time_from_last_output/1000))
if 1000 * 60 * 60 * 24 * MIN_IDLE_DAYS > time_from_last_output:
# прошло меньше MIN_IDLE_DAYS дней
UnusedPorts.objects.filter(interface=nb_interface).delete()
else:
unused_port_count += 1
unused_port, created = UnusedPorts.objects.get_or_create(
interface=nb_interface,
defaults={
'last_output': last_output
}
)
if not created:
unused_port.last_output = last_output
unused_port.save()
output += f'неиспользуемых в течении {MIN_IDLE_DAYS} дн. портов - {unused_port_count}\n'
return output
jobs = [UnusedPortsUpdate]
В дальнейшем можно будет запланировать регулярный запуск джоба, чтобы всегда иметь актуальный список неиспользуемых портов. Напомню, что в Nautobot для этого есть встроенный шедулер джобов. Всего лишь надо при запуске джоба указать периодичность.
Теперь научим плагин выводить полученные данные на странице коммутатора. Для этого добавим в корень плагина файлы template_content.py и templates/unused_ports.html
Названия говорят сами за себя - подготовить содержимое шаблона и отрендерить блок HTML. Отрендеренный блок разместим на странице Device, в правой части.
template_content.py
from nautobot.extras.plugins import PluginTemplateExtension
from django.conf import settings
from .models import UnusedPorts
class DeviceUnusedPorts(PluginTemplateExtension):
"""Template extension to display unused ports on the right side of the page."""
model = 'dcim.device'
def right_page(self):
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
MIN_IDLE_DAYS = PLUGIN_CFG.get('min_idle_days', 14)
device = self.context['object']
if device.device_role.slug in SWITCHES_ROLE_SLUG:
device_intefaces = device.interfaces.values_list("id", flat=True)
unused_ports = UnusedPorts.objects.filter(interface_id__in=device_intefaces)
unused_ports_with_delta = []
for port in unused_ports:
unused_ports_with_delta.append({
'interface_name': port.interface.name,
'last_output': port.last_output.strftime("%d.%m.%Y %H:%M"),
'updated': port.updated.strftime("%d.%m.%Y %H:%M"),
'delta': str(port.updated - port.last_output).split()[0]
})
return self.render('unused_ ports.html', extra_context={
'unused_ports': unused_ports_with_delta,
'min_idle_days': MIN_IDLE_DAYS
})
else:
return ''
template_extensions = [DeviceUnusedPorts]
templates/unused_ports.html
<div class="panel panel-default">
<div class="panel-heading"><strong>Неиспользуемые порты (в течение последних {{ min_idle_days }} дн.)</strong></div>
<table class="table table-hover panel-body attr-table">
<tr>
<td>Порт</td>
<td>Время последней активности</td>
<td>Обновлено (UTC)</td>
</tr>
{% for item in unused_ports %}
<tr>
<td>{{ item.interface_name }}</td>
<td>{{ item.last_output }} (прошло ~{{ item.delta }} дн.)</td>
<td>{{ item.updated }}</td>
</tr>
{% endfor %}
</table>
</div>
Осталось смешать все ингредиенты и попробовать, что получилось:
Создадим миграции - стандартный механизм Django для обеспечения версионности БД в обертке Nautobot.
nautobot-server makemigrations
(выполняется из папки плагина)добавим файлы setup.py и MANIFEST.in
установим плагин простым
pip3 install .
из каталога плагинавключим плагин в конфигурационном файле nautobot и там же укажем параметры плагина
PLUGINS = [
'nautobot_porthistory_plugin',
]
PLUGINS_CONFIG = {
'nautobot_porthistory_plugin': {
'switches_role_slug': ['Access-switch'],
'min_idle_days': 14,
}
}запустим пост-инсталляционный скрипт
nautobot-server post_upgrade
. Он обновит БД и скопирует HTML файлы в соответствующую папку.Перезапустим сервисы:
sudo systemctl restart nautobot nautobot-worker
Готово! Можно запустить джоб и проверить результаты его выполнения:
Полный код плагина, созданный на этом этапе, можно подсмотреть по ссылке - https://github.com/iontzev/nautobot-porthistory-plugin/tree/part_1
Этап второй - прокачка плагина
Вторая часть реализации будет посложнее. Тем интереснее, ведь это позволит понять, что еще могут плагины в Nautobot. Сперва добавим еще одну модель в БД - таблицу, в которой будем хранить MAC и IP адреса с привязкой к интерфейсу. Способ уже знакомый - обновить models.py и создать новую миграцию nautobot-server makemigrations
from nautobot.dcim.fields import MACAddressCharField
class MAConPorts(BaseModel):
# MAC и IP на порту коммутатора
updated = models.DateTimeField(auto_now=True)
mac = MACAddressCharField(blank=False, verbose_name="MAC Address")
vlan = models.ForeignKey(
to="ipam.VLAN",
on_delete=models.CASCADE,
blank=False,
)
ipaddress = models.ForeignKey(
to="ipam.IPAddress",
on_delete=models.SET_NULL,
default=None,
blank=True,
null=True,
)
interface = models.ForeignKey(
to="dcim.Interface",
on_delete=models.CASCADE,
blank=False,
)
device = models.ForeignKey(
to="dcim.Device",
on_delete=models.CASCADE,
blank=False,
)
def __str__(self):
return f'{self.intervace} - VLAN {seld.vlan.vid} MAC {self.mac}'
class Meta:
verbose_name_plural = 'MAC and IP on switches ports'
В jobs.py добавим еще один класс - он же джоб. Алгоритм его работы относительно несложный:
асинхронно опросить по SNMP все устройства с ролью device_role.slug = switches_role_slug из конфига - получить mac address table и соответствия MAC именам портов.
асинхронно опросить по SNMP все устройства с ролью device_role.slug = routers_role_slug из конфига - получить ARP таблицу. Не забудем, что конфиг тоже надо подправить - он находится в __init__.py
попробовать получить hostname по IP
если IP адрес не внесен в Nautobot, добавить
обновить информацию о привязке MAC и IP к интерфейсам устройств
Обновление построено таким образом, что если MAC на порту пропадет физически, мы сможем его найти в истории подключений, если на этом порту коммутатора больше ничего не появлялось.
Новый джоб в jobs.py
class MAConPortsUpdate(Job):
class Meta:
name = "Обновление информации о подключенных устройствах"
site = ObjectVar(
model=Site,
label='БЮ',
required=False
)
async def bulk_snmp(self, device, oid_list, community):
oid_results = {}
try:
async with aiosnmp.Snmp(
host=device,
port=161,
community=community,
timeout=5,
retries=3,
max_repetitions=10,
) as snmp:
oid_bulk_result = {}
for oid in oid_list:
reply = await snmp.bulk_walk(oid)
for index in reply:
oid_bulk_result[index.oid] = index.value
oid_results[oid] = oid_bulk_result
return (device, oid_results)
except Exception as error:
return (device, error)
return (device, None)
async def bulk_snmp_with_semaphore(self, semaphore, function, *args, **kwargs):
async with semaphore:
return await function(*args, **kwargs)
async def async_bulk_snmp(self, devices, oid_list, community, workers):
semaphore = asyncio.Semaphore(workers)
coroutines = [
self.bulk_snmp_with_semaphore(semaphore, self.bulk_snmp, device, oid_list, community)
for device in devices
]
result = []
for future in asyncio.as_completed(coroutines):
result.append(await future)
return result
def run(self, data, commit):
# запускать job могут только пользователи is_superuser
if not self.request.user.is_superuser:
self.log_info(message='Неавторизованный запуск')
return
PLUGIN_CFG = settings.PLUGINS_CONFIG['nautobot_porthistory_plugin']
COMMUNITY = PLUGIN_CFG['snmp_community']
SWITCHES_ROLE_SLUG = PLUGIN_CFG['switches_role_slug']
ROUTERS_ROLE_SLUG = PLUGIN_CFG['routers_role_slug']
WORKERS = PLUGIN_CFG['workers']
STATUS_ACTIVE = Status.objects.get(slug='active')
STATUS_STATIC = Status.objects.get(slug='static')
STATUS_DHCP = Status.objects.get(slug='dhcp')
device_role = DeviceRole.objects.filter(slug__in=SWITCHES_ROLE_SLUG)
devices = defaultdict(dict)
devices_list = []
vlans = defaultdict(list)
# построим список всех связей, чтобы потом исключить из результатов линки между свичами
cable_set = defaultdict(set)
all_cables = Cable.objects.all()
for cable in all_cables:
if cable.termination_a_type == ContentType.objects.get(app_label='dcim', model='interface'):
if not data['site'] or cable.termination_a.device.site == data['site']:
cable_set[cable.termination_a.device.name].add(cable.termination_a.name)
if cable.termination_b_type == ContentType.objects.get(app_label='dcim', model='interface'):
if not data['site'] or cable.termination_b.device.site == data['site']:
cable_set[cable.termination_b.device.name].add(cable.termination_b.name)
# сгенерируем справочник вланов с разбивкой по сайтам
vlans_by_site = defaultdict(list)
if data['site']:
nb_vlans = VLAN.objects.filter(site=data['site'], status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
else:
nb_vlans = VLAN.objects.filter(status=STATUS_ACTIVE, _custom_field_data={'flag-porthistory':True})
for nb_vlan in nb_vlans:
vlans_by_site[nb_vlan.site.name].append(nb_vlan.vid)
# сгенерируем справочник устройств
for site in vlans_by_site:
site_id = Site.objects.get(name=site)
nb_devices_in_site = Device.objects.filter(
site=site_id,
device_role__in=device_role,
status=STATUS_ACTIVE,
)
for nb_device in nb_devices_in_site:
if (nb_device.platform and
nb_device.platform.napalm_driver and
nb_device.platform.napalm_driver == 'cisco_iosxe' and
nb_device.primary_ip4):
primary_ip = str(nb_device.primary_ip4).split('/')[0]
devices_list.append(primary_ip)
device = devices[primary_ip] = {}
device['device'] = nb_device
device['site'] = nb_device.site
device['interfaces'] = {}
device['ifindexes'] = {}
device['bridge_ports'] = {}
device['vlans'] = vlans_by_site[site]
for intf in Interface.objects.filter(device_id=nb_device):
device['interfaces'][intf.name] = intf
for vlan in vlans_by_site[site]:
vlans[vlan].append(primary_ip)
# получим названия интерфейсов и их индексы с оборудования по SNMP
oid_list = ['.1.3.6.1.2.1.31.1.1.1.1']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
self.log_warning(obj=devices[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
del devices[device_ip]
devices_list.remove(device_ip)
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
ifindex = index.split('.')[-1]
canonical_intf_name = canonical_interface_name(index_result.decode("utf-8"))
if canonical_intf_name in devices[device_ip]['interfaces']:
devices[device_ip]['ifindexes'][ifindex] = canonical_intf_name
# пройдемся по списку вланов и получим с устройства таблицу MAC адресов для каждого влана
# MAC адреса в десятичном формате
port_mac_relation = defaultdict(list)
for vlan, devices_dict in vlans.items():
self.log_info(message=f'Получаем информацию по VLAN {vlan}')
community_with_vlan = f'{COMMUNITY}@{vlan}'
devices_list = [device for device in devices_dict if device in devices_list]
# получим bridge ports с оборудования по SNMP (зависит от VLAN)
oid_list = ['.1.3.6.1.2.1.17.1.4.1.2']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
for device_ip, device_result in results:
if type(device_result) != dict:
# скорее всего, такого VLAN нет на этом устройстве
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
bridge_port = index.split('.')[-1]
ifindex = str(index_result)
if ifindex in devices[device_ip]['ifindexes']:
ifname = devices[device_ip]['ifindexes'][ifindex]
nb_interface = devices[device_ip]['interfaces'][ifname]
devices[device_ip]['bridge_ports'][bridge_port] = nb_interface
else:
oid_list = ['.1.3.6.1.2.1.17.4.3.1.2']
results = asyncio.run(self.async_bulk_snmp(devices_list, oid_list, community_with_vlan, WORKERS))
for device_ip, device_result in results:
nb_device = devices[device_ip]['device']
nb_vlan = VLAN.objects.get(vid=vlan, site_id=nb_device.site.id)
if type(device_result) != dict:
continue
for oid, oid_result in device_result.items():
for mac_dec, bridge_port in oid_result.items():
if str(bridge_port) in devices[device_ip]['bridge_ports']:
if (devices[device_ip]['bridge_ports'][str(bridge_port)].name not in cable_set[nb_device.name]
and not devices[device_ip]['bridge_ports'][str(bridge_port)]._custom_field_data.get('flag-ignore-mac')):
# преобразуем MAC из десятичного формата в шестнадцатеричный
mac_hex = ''.join(['{0:x}'.format(int(i)).zfill(2) for i in mac_dec.split('.')[-6:]]).upper()
port_mac_relation[devices[device_ip]['bridge_ports'][str(bridge_port)].id].append({
'vlan': nb_vlan,
'mac': mac_hex,
})
# подготовим список L3 устройств
routers = defaultdict(dict)
routers_list = []
device_role = DeviceRole.objects.filter(slug__in=ROUTERS_ROLE_SLUG)
for site in vlans_by_site:
site_id = Site.objects.get(name=site)
nb_devices_in_site = Device.objects.filter(
site=site_id,
device_role__in=device_role,
status=STATUS_ACTIVE,
)
for nb_device in nb_devices_in_site:
if (nb_device.platform and
nb_device.platform.napalm_driver and
nb_device.platform.napalm_driver == 'cisco_iosxe' and
nb_device.primary_ip4):
primary_ip = str(nb_device.primary_ip4).split('/')[0]
routers_list.append(primary_ip)
router = routers[primary_ip] = {}
router['site'] = nb_device.site.name
router['device'] = nb_device
arp = defaultdict(dict)
# получим ARP-таблицу с оборудования по SNMP
oid_list = ['.1.3.6.1.2.1.3.1.1.2']
results = asyncio.run(self.async_bulk_snmp(routers_list, oid_list, COMMUNITY, WORKERS))
for device_ip, device_result in results:
site = routers[device_ip]['site']
arp[site] = defaultdict(list)
if type(device_result) != dict:
self.log_warning(obj=routers[device_ip]['device'],message=f'не удалось получить информацию по SNMP')
continue
for oid, oid_result in device_result.items():
for index, index_result in oid_result.items():
snmp_address = '.'.join(index.split('.')[-4:])
snmp_mac = ''.join(["{0:x}".format(int(i)).zfill(2) for i in index_result]).upper()
arp[site][snmp_mac].append(snmp_address)
output = ''
for device in devices.values():
nb_device = device['device']
site = nb_device.site.name
output += f'device {nb_device} :'
mac_on_device = ip_on_device = name_on_device = 0
for intf in device['interfaces'].values():
if len(port_mac_relation[intf.id]) > 0:
MAConPorts.objects.filter(interface=intf).delete()
for vlan_and_mac in port_mac_relation[intf.id]:
mac_on_device += 1
nb_prefixes = Prefix.objects.filter(vlan_id=vlan_and_mac['vlan'].id)
addresses = arp[site].get(vlan_and_mac['mac'])
address_with_prefix = ''
if nb_prefixes and addresses:
for nb_prefix in nb_prefixes:
for address in addresses:
if IPv4Address(address) in IPv4Network(str(nb_prefix)):
prefixlen = str(nb_prefix).split('/')[-1]
address_with_prefix = f'{address}/{prefixlen}'
break
else:
continue
break
if address_with_prefix:
ip_on_device += 1
try:
hostname, aliaslist, ipaddrlist = socket.gethostbyaddr(address)
name_on_device += 1
except:
hostname=''
nb_address, created = IPAddress.objects.get_or_create(
address=address_with_prefix,
vrf=nb_prefix.vrf,
defaults={
'status': STATUS_STATIC,
'dns_name': hostname
}
)
if created:
self.log_success(obj=nb_address, message=f'Добавлен IP адрес {hostname}')
elif nb_address.status != STATUS_DHCP and hostname and nb_address.dns_name != hostname:
old_hostname = nb_address.dns_name
nb_address.dns_name = hostname
nb_address.save()
self.log_success(obj=nb_address, message=f'Обновлено DNS name "{old_hostname}" -> "{hostname}"')
else:
nb_address = None
mac, created = MAConPorts.objects.get_or_create(
vlan=vlan_and_mac['vlan'],
mac=vlan_and_mac['mac'],
defaults={
'interface': intf,
'device': nb_device,
'ipaddress': nb_address,
}
)
if not created:
updated = False
if nb_address and mac.ipaddress != nb_address:
self.log_info(obj=nb_address, message=f'Устройство с MAC {mac.mac} поменяло IP {mac.ipaddress} -> {nb_address}')
mac.ipaddress = nb_address
updated = True
if mac.interface != intf:
self.log_info(obj=intf, message=f'MAC {mac.mac} переехал с порта "{mac.interface}"')
mac.interface = intf
mac.device = nb_device
updated = True
if updated:
mac.save()
output += f" MAC count - {mac_on_device}, IP count - {ip_on_device}, resolved to hostname - {name_on_device}\n"
return output
Важно! Особенность получения по SNMP таблицы MAC с коммутаторов Cisco в том, что необходимо указывать в строке community номер VLAN, в котором живут MAC адреса.
Еще учтем, что не про все VLAN надо знать. Например, если в каком-то VLAN расположены беспроводные клиенты, в них бессмысленно отслеживать MAC. То же самое касается портов. Значит нам нужен механизм фильтрации VLAN и портов коммутаторов. Сюда отлично подойдут custom_fields, но использовать в скриптах названия полей, которые надо добавить пользователю, значит надеяться на маленькое чудо.
Так давайте будем реалистами, добавим custom_fileds при установке плагина. Поможет нам в этом функционал Django signals, опять же в обертке Nautobot.
Добавим вызов сигнала в __init__.py, а сам сигнал в signals.py.
__init__.py
class NautobotPorthistoryPluginConfig(PluginConfig):
............
def ready(self):
super().ready()
nautobot_database_ready.connect(create_custom_fields_for_porthistory, sender=self)
signals.py
from nautobot.extras.choices import CustomFieldTypeChoices
def create_custom_fields_for_porthistory(sender, apps, **kwargs):
"""Create a custom field flag_porthistory for VLAN if it doesn't already exist."""
# Use apps.get_model to look up Nautobot core models
ContentType = apps.get_model("contenttypes", "ContentType")
CustomField = apps.get_model("extras", "CustomField")
VLAN = apps.get_model("ipam", "VLAN")
Interface = apps.get_model("dcim", "Interface")
# Create custom fields
cf_for_vlan, created = CustomField.objects.update_or_create(
name="flag-porthistory",
defaults={
"label": "Search MACs on ports in this VLAN",
"type": CustomFieldTypeChoices.TYPE_BOOLEAN,
},
)
cf_for_vlan.content_types.set([ContentType.objects.get_for_model(VLAN)])
cf_for_interface, created = CustomField.objects.update_or_create(
name="flag-ignore-mac",
defaults={
"label": "Ignore MACs on this port",
"type": CustomFieldTypeChoices.TYPE_BOOLEAN,
},
)
cf_for_interface.content_types.set([ContentType.objects.get_for_model(Interface)])
Теперь при установке плагина автоматически добавятся новые поля к моделям VLAN и Device.interface. Отметив на странице редактирования VLAN галочку Search MACs on ports in this VLAN мы тем самым разрешаем нашему джобу искать MAC адреса в этом VLAN.
Итак, у нас есть джоб и настроенные VLAN, а значит можно заполнять БД нужными данными. Но как посмотреть эти данные? Первый путь нам уже знаком - используем шаблоны для страниц интерфейса и IP-адреса. Таким образом можно точечно извлекать информацию из БД и показывать пользователю:
Второй путь - создать отдельную страницу плагина с возможностью вывода и поиска полученных данных. Этот путь более тернист, чем первый, но и результаты более интересны.
В подготовке страницы плагина участвуют 6 составляющих. Число их можно сократить, если использовать HTML-шаблоны, но это немного другая история
Итак, по порядку:
navigation.py - добавляет строку в выпадающее меню Plugins
urls.py - вешает на url (добавленной строки в п.1) обработчик
views.py - содержит обработчики (View). Обработчик собирает в одно целое данные, параметры вывода (таблицы), форму и алгоритмы фильтрации
tables.py - выводит таблицу из БД
forms.py - формирует HTML форму для фильтрации данных в таблице
filters.py - содержит алгоритмы фильтрации данных
navigation.py
from nautobot.extras.plugins import PluginMenuItem
menu_items = (
PluginMenuItem(
link = 'plugins:nautobot_porthistory_plugin:history', # A reverse compatible link to follow.
link_text = 'MAC and IP on switches ports', # Text to display to user.
),
)
urls.py
from django.urls import path
from nautobot_porthistory_plugin import views
urlpatterns = [
path('history/', views.PortHistoryView.as_view(), name='history'),
]
views.py
from django.shortcuts import render
from nautobot.core.views import generic
from nautobot_porthistory_plugin import models, tables, filters, forms
class PortHistoryView(generic.ObjectListView):
"""Показывает MAC и IP адреса на портах"""
queryset = models.MAConPorts.objects.all()
table = tables.PortHistoryTable
filterset = filters.PortHistoryFilterSet
filterset_form = forms.PortHistoryFilterForm
action_buttons = ()
tables.py
import django_tables2 as tables
from django_tables2.utils import A
from nautobot.utilities.tables import BaseTable, ToggleColumn
from nautobot_porthistory_plugin import models
class PortHistoryTable(BaseTable):
pk = ToggleColumn()
device = tables.Column(linkify=True)
interface = tables.LinkColumn(orderable=False)
vlan = tables.LinkColumn()
ipaddress = tables.Column(linkify=True, verbose_name="IPv4 Address")
class Meta(BaseTable.Meta): # pylint: disable=too-few-public-methods
"""Meta attributes."""
model = models.MAConPorts
fields = (
'pk',
'device',
'interface',
'vlan',
'mac',
'ipaddress',
'updated',
)
forms.py
from django import forms
from nautobot.dcim.models import Region, Site, Device
from nautobot.ipam.models import VLAN
from nautobot.utilities.forms import BootstrapMixin, DynamicModelMultipleChoiceField
from nautobot.extras.forms import CustomFieldFilterForm
from nautobot_porthistory_plugin.models import MAConPorts
class PortHistoryFilterForm(BootstrapMixin, forms.Form):
"""Filter form to filter searches for MAC."""
model = MAConPorts
field_order = ["q", "site", "device_id", "vlan"]
q = forms.CharField(required=False, label="Search MAC")
site = DynamicModelMultipleChoiceField(
queryset=Site.objects.all(),
to_field_name="slug",
required=False,
)
device_id = DynamicModelMultipleChoiceField(
queryset=Device.objects.all(),
required=False,
label="Device",
query_params={"site": "$site"},
)
vlan = DynamicModelMultipleChoiceField(
queryset=VLAN.objects.all(),
required=False,
label="VLAN",
query_params={"site": "$site"},
)
filters.py
import django_filters
from nautobot.dcim.models import Device
from nautobot.utilities.filters import BaseFilterSet, MultiValueCharFilter
from django.db.models import Q
from nautobot_porthistory_plugin.models import MAConPorts
class PortHistoryFilterSet(BaseFilterSet):
"""Filter for MAConPorts"""
q = django_filters.CharFilter(method="search", label="Search MAC")
site = MultiValueCharFilter(
method="filter_site",
field_name="pk",
label="site",
)
device_id = MultiValueCharFilter(
method="filter_device_id",
field_name="pk",
label="Device (ID)",
)
vlan = MultiValueCharFilter(
method="filter_vlan",
field_name="pk",
label="VLAN",
)
class Meta:
"""Meta attributes for filter."""
model = MAConPorts
fields = [
'vlan'
]
def search(self, queryset, mac, value):
if not value.strip():
return queryset
mac = ''.join(ch for ch in value if ch.isalnum())
mac = ':'.join(mac[i:i+2] for i in range(0,len(mac),2))
return queryset.filter(Q(mac__icontains=mac))
def filter_site(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(device__site__slug__in=id_list) )
def filter_device_id(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(device__id__in=id_list) )
def filter_vlan(self, queryset, name, id_list):
if not id_list:
return queryset
return queryset.filter(Q(vlan__id__in=id_list) )
Вот такая получилась страница плагина. Можно фильтровать по сайту, по устройству, по VLAN. Можно искать по части MAC, причем формат ввода в поле поиска абсолютно не важен, хоть через нижние подчеркивания, хоть капсом через одну.
Какие выводы я для себя сделал
Написать свой плагин не так и сложно, как мне казалось в самом начале, когда я только изучал тему Netbox и Nautobot. Nautobot/Netbox представляют собой отличную платформу для автоматизации. Прикладывая минимум усилий можно достичь отличных результатов в деле адаптации Nautobot/Netbox к реалиям своей организации.
На этом всё, спасибо за внимание.
Ну и для тех, кому эта тема действительно интересна и кто дочитал до конца, ссылка на код плагина - https://github.com/iontzev/nautobot-porthistory-plugin