Кобра: создаем OSINT инструмент на Python, часть 1
Итак, каждый программист желает все автоматизировать — и не только программист. В этой статье мы рассмотрим создание OSINT-инструмента на Python.
Я хочу предупредить, что все показано в целях ознакомления, данные человека не подлежат хранению или распространению без соответствующего разрешения.
В этой статье мы создадим инструмент для:
- Получения информации об IP адресе
- Получения информации об номере телефона
- Получение информации, есть ли IP в черных листах DNS
- Парсер всех ссылок с сайта
- Сканер портов
- SYN-сканер портов
- Сканер сессий Meterpreter для Windows 7/10
- Virus Total API
- Сканер SQL инъекций
- Сканер XSS уязвимостей
- Генератор фейкового User-Agent
- Скрипт для изменения mac-адреса
Эта статья — первая часть, здесь мы рассмотрим минимум, а в следующей части доделаем код, улучшим его и сделаем полноценный CLI-инструмент, всю информацию по обновлению кода и уведомления о новых частях буду публиковать у себя в канале.
Данная статья ориентирована на продвинутых разработчиков, которые уже умеют писать и читать python-код.
Для всего этого нам нужен будет Python >3.9, Linux и пакетный менеджер pip.
Весь исходный код доступен по ссылке. Репозиторий локализирован на английский язык, в директории /docs/ru
есть markdown-версия статьи.
Создаем рабочее окружение
Никогда вам не советую устанавливать пакеты в саму систему. Для того чтобы работать над каждым проектом отдельно (дабы не возникало ошибок с пакетами и библиотеками) надо создавать виртуальные окружения.
Для того, чтобы создать виртуальное окружение, нам потребуется всего лишь одна команда:
python3 -m venv <название виртуального окружения>
# Например, создание виртуального окружения venv
python3 -m venv venv
После ввода данной команды у вас в текущем проекте будет создана новая директория — это и будет виртуальное окружение.
Но это еще не все — нам предстоит активировать его командой:
source venv/bin/activate
# Или, если у вас fish:
source venv/bin/activate.fish
И вот теперь мы можем спокойно работать с проектом и устанавливать пакеты. Для деактивации и выхода из окружения можно использовать команду deactivate
.
Установка зависимостей
Для начала, давайте установим пакет whois
. Он потребуется для того, чтобы мы могли узнать информацию об IP адресах.
sudo pacman -S whois # Arch
sudo apt install whois # Debian/Ubuntu
sudo dnf install whois # Fedora
sudo epm install whois # Alt
После активации нашего окружения, можно установить python-пакеты. Список нужных нам можно получить по ссылке в файле requirements.txt. Просто скачиваете его в проект и выполняете следующию команду:
pip3 install -r requirements.txt
Этим мы установили нужные нам зависимости. Теперь можно начать разработку!
Архитектура проекта
Давайте создадим архитектуру нашего инструмента:
- Директория
core
— базовый функционал для самого инструмента - Директория
modules
— главная папка всего проекта, здесь мы будем хранить модули для работы.
-
anonymity
— модуль анонимности, безопасности. Небольшой, чаще всего вспомогательный. Есть пока два компонента — генерация фейкового юзер-агента (fakeuseragent.py
) и изменение MAC-адреса устройства (machanger.py
) -
network_base
— вспомогательно-информационный модуль. Содержит информацию о сети, IP и портах. Один из самых больших модулей. Содержит следующие файлы: адрес из гео-координат, домен из IP, IP из домена, небольшие программы для работы с IPv4, ваш публичный IP, параметры сети, скрипт для пинга адреса, какой сервис на порту и т.д. -
osint
— модуль, как не странно, для OSINT и поиска информации. Здесь уже содержится прокси-файл для взаимодействия с модулем network_base, а также скрипты для получения информации об ip и номере телефона. -
scanners
— модуль для сканеров, как понятно из названия. Здесь есть сканер IP на наличие в черных списках DNS, парсер ссылок с сайта, сканер портов, поиск SQL-Injection и XSS уяизвимостей.
-
- Файл
main.py
— главный файл кода в корневом каталоге проекта.
Итак, в репозитории директория core является хранилищем трех файлов — это
-
highlight_schemes.py
— цветовые схемы Pygments. -
logger.py
— файл с классом логгера, дебаггера объектов. -
style.py
— файл с цветами, для того чтобы делать вывод красивее.
К сожалению, эти файлы мы не будем затрагивать — это совершенно другая тема. Если вы хотите, следующая статья будет об создании продвинутых CLI-программ на Python. А пока продолжим.
В репозитории modules есть огромное количество файлов и каталогов, я разберу здесь большую часть.
Создание модулей
Займемся созданием модулей. Начнем с модуля анонимности
Генерация фейкового User-Agent
Для этого мы будем использовать библиотеку fake-useragent
.
from fake_useragent import UserAgent
def generate_useragent() -> str:
fua = UserAgent()
return str(fua.random)
Здесь всего лишь одна функция — генерации случайного юзер-агента.
Изменение MAC-адреса
Для этого нам не нужны будут дополнительные библиотеки. Но вам возможно нужно будет установить набор инструментов net-tools
. Этот набор содержит в себе утилиту ifconfig, аналог ipconfig в Windows, который нам нужен будет для управления сетями.
import re
import subprocess
from random import choice
def ifconfig():
"""Возвращаем вывод команды ifconfig"""
output = subprocess.check_output(["sudo", 'ifconfig'])
return str(str(output.decode('utf-8')).replace(r'\n', '\n'))
def change_mac(interface: str, new_mac_address: str) -> None:
"""Функция для изменения MAC-адреса при помощи утилиты ifconfig
Аргументы:
+ interface: str - название интерфейса (ex. wlan0, wlp1s0, wlp3s0, eth)
+ new_mac_address - значение нового mac-адреса"""
print(f'[+] Выключение {interface}')
subprocess.call(["doas", "ifconfig", interface, "down"])
print(f'[+] Замена MAC-адреса {interface} на {new_mac_address}')
subprocess.call(["doas", 'ifconfig', interface, 'hw', 'ether', new_mac_address])
print(f'[+] Включение {interface}')
subprocess.call(["doas", "ifconfig", interface, "up"])
def get_random_mac_address() -> str:
"""Функция генерации и получения случайного mac-адреса.
У нас есть словарь с числами от 0 до 9, и латинских букв от a до f, а также начало
нового mac-адреса (00). После мы проходимся в итерационном цикле и добавляем 5 раз еще
по паре символов, а после возвращаем
Возвращает:
+ str - новый mac-адрес"""
characters = list("1234567890abcdef")
random_mac_address = "00"
for i in range(5):
random_mac_address += ':' + choice(characters) + choice(characters)
return random_mac_address
def get_current_mac(interface: str):
"""Получаем текущий MAC-адрес при помощи утилиты ifconfig"""
output = subprocess.check_output(["doas", "ifconfig", interface])
return re.search(r"\w\w:\w\w:\w\w:\w\w:\w\w:\w\w", str(output)).group(0)
def main(interface: str):
new_mac_address = get_random_mac_address()
try:
current_mac = get_current_mac(interface)
except subprocess.CalledProcessError as ex:
print(f'[!] Произошла ошибка. Скорее всего {interface} не существует')
print(f' !-> {ex}')
return
print(f'[+] Текущий MAC-адрес: {current_mac}')
print(f'[+] Замена MAC-адреса на {new_mac_address}')
change_mac(interface, new_mac_address)
Модуль network_base
Этот модуль, как я уже говорил, нужен для работы с сетью. Но весь код занимает слишком много места, поэтому посмотреть вы его можете по ссылке.
Обязательно советую использовать этот код, ибо он требуется для получения информации об IP. Можете просто скопировать код и изучить его, он обильно документирован.
Там вы увидите множество файлов — все они пригодятся.
Модуль OSINT
Теперь займемся самим поиском информации по открытым источникам. Займемся файлом ip.py
— он поведует нам некоторую информацию об IP адресе.
import os
import requests
def get_info_about_ip(ipaddr: str, fua: str) -> tuple:
"""Получаем информацию об IP адресе
Аргументы:
+ ipaddr: str - IP адрес
+ fua: str - фейковый юзер-агент"""
result = str()
try:
# Отправляем запрос
headers = {
'User-Agent': fua
}
info_data = requests.get(f'https://ipinfo.io/{ipaddr}/json', headers=headers).json()
except Exception as ex:
# При ошибке возвращаем саму ошибку.
return ex
# Получаем информацию об IP
whois_info = os.popen(f'whois {ipaddr}').read().strip()
result += f'IP: {info_data.get("ip")}\n'
result += f'City: {info_data.get("city")}\n'
result += f'Region: {info_data.get("region")}\n'
result += f'Country: {info_data.get("country")}\n'
result += f'Hostname: {info_data.get("hostname")}\n'
result += f'JSON data: {info_data}\n'
result += f'WhoIS: {whois_info}\n'
# возвращаем результат
return result
Следующий шаг — компонент netlib.py
. Он как раз и будет неким посредником между модулем OSINT и модулем network_base.
import modules.network_base.ipv4_local_cli as ipv4_cli
import modules.network_base.ipv4_local_getmac as ipv4_gm
import modules.network_base.ipv4_local_sock as ipv4_sock
import modules.network_base.router_ip as getaway
import modules.network_base.network_params as net_param
import modules.network_base.ip_from_domain as ip_domain
import modules.network_base.domain_from_ip as domain_ip
import modules.network_base.service_on_port as serv_port
import modules.network_base.my_public_ip as public_ip
import modules.network_base.ping_address as ping_addr
import modules.network_base.geolocation_ip as geo_ip
import modules.network_base.addr_from_geo as addr_geo
def url_info(url):
"""Информация об URL.
URL автоматически ретранслируется в IP"""
# Пинг адреса
print(f'Ping domain of address: {ping_addr.ping_addr(ip_domain.ip_from_domain(f"{url}"))}')
# Координаты
print(f"Coords by IP address: {geo_ip.geo_ip(domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}')))}")
# Физический адрес по координатам
print(f"Physical address by coords: "
f"{addr_geo.get_addr(geo_ip.geo_ip(domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}'))))}")
# IP из домена
print(f"IP-address of domain {url}: {ip_domain.ip_from_domain(f'{url}')}")
# Домен из IP
print(f"Domain name {url} by IP: {domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}'))}")
def ip_info(ip: str) -> str:
"""Информация об IP"""
print(f'Ping domain or address: {ping_addr.ping_addr(ip)}')
print(f"Coords by IP: {geo_ip.geo_ip(ip)}")
print(f"Geo Addr Coords by IP: {addr_geo.get_addr(geo_ip.geo_ip(domain_ip.domain_ip(ip)))}")
print(f"Domain name: {domain_ip.domain_ip(ip)}")
def check_network():
"""Проверяем сетевые параметры"""
# Локальный IP
print(f'Local IP (cli): {ipv4_cli.local_ipv4()}')
print(f'Local IP (gm): {ipv4_gm.local_ipv4()}')
print(f'Local IP (sock): {ipv4_sock.local_ipv4()}')
# IP шлюза
print(f'IP router: {getaway.router_ip()}')
# Параметры сети
print(f'Network interface parameters:\n{net_param.network_param()}')
# Название сервиса работающего на 80 порту
print(f'Name of service work on port 80: {serv_port.type_port(80)}')
# Публичный IP
print(f'Public IP: {public_ip.public_ip()}')
Следующий компонент — phone.py
, который отвечает за небольшой поиск информации об номере телефона.
import requests
def get_info_phonenumber(phonenumber, fua):
"""Получение информации о номере телефона
Аргументы:
+ phonenumber - номер телефона
+ fua - фейковый юзер-агент"""
result = ''
try:
# Делаем запрос к API
url = f"https://htmlweb.ru/geo/api.php?json&telcod={phonenumber}"
headers = {
'User-Agent': fua
}
info_data = requests.get(url, headers=headers).json()
except Exception as ex:
return ex
result += f'Country: {info_data["country"]["name"]}\n'
result += f'Region: {info_data["region"]["name"]}\n'
result += f'Subregion: {info_data["region"]["okrug"]}\n'
result += f'Operator: {info_data["0"]["oper"]}\n'
result += f'Location: {info_data["country"]["location"]}\n'
return result
После этого можно заняться следующим компонентом — whois_information.py
:
#!venv/bin/python3
import socket
import time
from ipaddress import IPv4Address, AddressValueError
from datetime import datetime
import ipwhois
import whois
def ipwhois_info(ip: str):
"""Информация о IP по IPWhois
+ ip: str - IP адрес"""
results = ipwhois.IPWhois(ip).lookup_whois()
print(results)
print("\n")
def whois_info(ip):
"""Информация о IP по WhoIs
+ ip: str - IP адрес"""
results = whois.whois(ip)
print(results)
def ianna(ip):
"""Информация о IP через whois.iana.org
+ ip: str - IP адрес"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("whois.iana.org", 43))
s.send((ip + "\r\n").encode())
response = b""
while True:
data = s.recv(4096)
response += data
if not data:
break
s.close()
whois = ''
for resp in response.decode().splitlines():
if resp.startswith('%') or not resp.strip():
continue
elif resp.startswith('whois'):
whois = resp.split(":")[1].strip()
break
return whois if whois else False
def get_whois(ip, whois):
"""Получения информации о IP"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((whois, 43))
s.send((ip + "\r\n").encode())
response = b""
while True:
data = s.recv(4096)
response += data
if not data:
break
s.close()
whois_ip = dict()
num = 0
for ln in response.decode().splitlines():
if ln.strip().startswith("%") or not ln.strip():
continue
else:
if ln.strip().split(": ")[0].strip() in ['created', 'last-modified']:
dt = datetime.fromisoformat(ln.strip().split(": ")[1].strip()).strftime("%Y-%m-%d %H:%M:%S")
whois_ip.update({f'{ln.strip().split(": ")[0].strip()}_{num}': dt})
num += 1
else:
whois_ip.update({ln.strip().split(": ")[0].strip(): ln.strip().split(": ")[1].strip()})
return whois_ip if whois_ip else False
def validate_request(ip):
"""Проверка IP
+ ip - IP адрес"""
try:
IPv4Address(ip)
if whois := ianna(ip):
time.sleep(1)
if info := get_whois(ip, whois):
print(info)
else:
print("Не была получена информация")
else:
if info := get_whois(ip, 'whois.ripe.net'):
print(info)
else:
print("Не была получена информация")
except AddressValueError:
print("IP адрес не валидный")
except ConnectionResetError as ex:
print(ex)
После этого давайте займемся предпоследним компонентом модуля OSINT — virus_total.py
для анализа бинарников на вирусы, трояны и другие малвари.
Для начала вам потребуется авторизоваться на сайте Virus Total. После активации аккаунта, откройте меню пользователя и перейдите в секцию API Key. Ниже вы увидете сам API ключ:
Не забудьте про ограничения бесплатного API-ключа:
- Частота запросов — 4 поиска / минута
- Дневная квота — 500 поисков / день
- Месячная квота — 15.5 K поисков / месяц
После просто скопируйте и вставьте в код. Есть разные практики хранить важные ключи в коде. Давайте рассмотрим три способа:
- Первый и простой способ. Просто вставить ключ прямо в коде
APIKEY='123456abcdef'
# Здесь ваш код
# ...
Но я категорически не советую так делать. Если вы забудете удалить ваш ключ, то он может оказаться в руках других людей. Они смогут получить доступ к вашему проекту, боту, аккаунта и т.д., в зависимости от возможностей API. Поэтому так делайте только в локальных проектах (да и даже так лучше не делать по стилю кода).
-
Создать файл настроек/конфигов, например
settings.py
и в нем хранить все важные ключи, а после импортировать в проект. Намного лучше чем 1 способ, но проблемы с безопасностью также остаются.
-
Переменные окружения. Безопасный способ, мы храним API не в коде, а в переменных окружения. Этот способ мы и будем использовать.
Вам будет надо установить пакет python-dotenv
:
pip install python-dotenv
И после создайте в корневом каталоге файл .env
, и в него вставьте следующий код:
VIRUS_TOTAL_KEY = "ваш ключ"
После этого давайте немного изменим концепцию, и создадим микро-антивирус. Он будет сперва проверять хеш файла в базе сигнатурах. Да, просто сопоставлять, и это не будет стоять рядом даже с самыми плохими антивирусами — ведь стоит добавить в бинарник малваря хотя-бы один байт, даже если это будет 0, то уже наш антивирус ничего не заподозрит. Но если малварь и пройдет сигнатурный анализ, то дальше его ждет Virus Total!
И еще, мы не будем сами посылать запросы, и парсить результат, мы будем использовать библиотеку vt-py.
А также для локальной проверки файлов нам нужны будут сигнатуры. Собрал я их в каталоге res, доступный по ссылке.
import os
import json
import requests
from dotenv import load_dotenv
import os
from time import perf_counter
from hashlib import sha256, sha1, md5
from colorama import Fore, Style
import vt
import json
# загружаем переменные окружения
load_dotenv()
# получаем API-ключ
VIRUS_TOTAL_API_KEY = os.getenv("VIRUS_TOTAL_KEY")
# Сигнатуры
signature_resources = ['res/signatures.txt', 'res/signatures2.txt',
'res/signatures3.txt', 'res/signatures4.txt']
# для своих требований, туда можно поместить хэши зловредов, которых нету в обычных списках сигнатур. А также описание и название.
signature_resource_info = 'res/signatures_info.json'
def add_signature(new_signature: dict) -> None:
"""Добавление сигнатуры в базу"""
with open(signature_resource_info, 'r') as file:
signatures_info = json.load(file)
for signature in new_signatures:
signatures_info['name'] = new_signatures['name']
signatures_info['desc'] = new_signatures['desc']
signatures_info['date'] = new_signatures['date']
with open(signature_resource_info, 'a') as file:
json.dump(signatures_info, file, indent=4)
return None
def rewrite_signatures(signatures: dict) -> bool:
"""Перезапись БД сигнатур"""
try:
with open(signature_resource_info, 'w') as file:
json.dump(signatures, file, indent=4)
except Exception as e:
return False
else:
return True
def get_info_signature(signature: str) -> str:
"""Получение информации об сигнатуре из БД"""
with open(signature_resource_info, 'r') as file:
signatures_info = json.load(file)
try:
signature_info = f'''Сигнатура: {signature}
Название угрозы: {signatures_info[signature]["name"]}
Описание угрозы: {signatures_info[signature]["desc"]}
Дата обнаружения угроза: {signatures_info[signature]["date"]}'''
except IndexError:
signature_info = f'По сигнатуре {signature} еще нету информации в наших базах данных'
finally:
return signature_info
def scan_file(filename: str, delete_file: bool, signature_resources: list, VT_API_KEY: str) -> None:
"""Сканируем файл
+ filename: str - путь до файла
+ delete_file: bool - удалять ли файл
+ signature_resources: list - список ресурсов сигнатур
+ VT_API_KEY: str - API ключ Virus Total"""
result = f'Сканируем {filename} на наличие угроз...\n'
try:
print('Load singatures...')
start = perf_counter()
shahash = sha256()
sha1hash = sha1()
md5hash = md5()
# Получаем хэши
with open(filename, 'rb') as file:
while True:
data = file.read()
if not data:
break
shahash.update(data) # sha256
sha1hash.update(data) # sha1
md5hash.update(data) #md5
result1 = shahash.hexdigest()
result2 = sha1hash.hexdigest()
result3 = md5hash.hexdigest()
result += f'Проверяем наличие хешей "{result1}", "{result2}", "{result3}" (sha256, sha1, md5) в сигнатурах...\n'
# Читаем сигнатуры
with open(signature_resources[0], 'r') as r:
signatures = list(r.read().split('\n'))
with open(signature_resources[1], 'r') as r:
for sign_hash2 in list(r.read().replace(';', '').split('\n')):
signatures.append(sign_hash2)
with open(signature_resources[2], 'r') as r:
for sign_hash3 in list(r.read().replace(';', '').split('\n')):
signatures.append(sign_hash3)
with open(signature_resources[3], 'r') as r:
for sign_hash4 in list(r.read().replace(';', '').split('\n')):
signatures.append(sign_hash4)
result += f'''Сканирование {filename} на Virus Total...\n'''
print('Loading Virus Total...')
client = vt.Client(VT_API_KEY)
# анализ файла
with open(filename, "rb") as f:
analysis = client.scan_file(f, wait_for_completion=True)
file = client.get_object(f"/files/{result1}")
stats = file.last_analysis_stats
result += f'VirusTotal: {filename}\t size: {file.size} bytes\n'
result += f'Безвредный: {stats["harmless"]}\n'
result += f'Неподдерживаемый тип: {stats["type-unsupported"]}\n'
result += f'Подозрительный: {stats["suspicious"]}\n'
result += f'Отказ: {stats["failure"]}\n'
result += f'Злонамеренный: {stats["malicious"]}\n'
result += f'Безопасный: {stats["undetected"]}\n'
print('Get final info...')
if result1 in signatures or result2 in signatures or result3 in signatures:
end = perf_counter()
total = end - start
result += f'Найдена угроза в файле {filename} (поиск по сигнатурам)\n'
# получаем информацию о файле, если он есть в сигнатурах
if result in signatures:
info = get_info_signature(result)
result += f'Информация: {info}\n'
elif result2 in signatures:
info = get_info_signature(result2)
result += f'Информация: {info}\n'
elif result3 in signatures:
info = get_info_signature(result3)
result += f'Информация: {info}\n'
# удаление файла
if delete_file:
os.remove(filename)
result += f'[!] Файл {filename} удален!\n'
result += f'Время работы: {(total):.07f}s\n'
else:
end = perf_counter()
total = end - start
result += 'Угроз не найдено\n'
result += f'Время работы: {(total):.07f}s\n'
except FileNotFoundError:
result += f'[!] Файл не найден\n'
except PermissionError:
result += f'[!] Ошибка прав доступа к файлу\n'
print('End')
return result
res = scan_file('<любой файл>', False, signature_resources, VIRUS_TOTAL_API_KEY)
print(res)
Вот и все — при запуске этого скрипта мы можем спокойно анализировать файлы на Virus Total.
Модуль scanners
Модуль всеразличных сканеров и парсеров. ПОка есть 6 — поиск IP в черных списках DNS, парсер ссылок, сканер портов, сканер на SQL-Injection и XSS уязвимости. А также есть еще детектор ARP-спуфинга и сканер сессий meterpreter для Windows.
Займемся детектором ARP спуфинга. ARP-spoofing — разновидность сетевой атаки типа MITM, применяемая в сетях с использованием протокола ARP. В основном применяется в сетях Ethernet. Атака основана на недостатках протокола ARP.
Нам нужен будет модуль scapy.
from scapy.all import sniff
class ARPSpoofingDetector:
def __init__(self):
self.IP_MAC_Map = {}
def process_packet(self):
src_IP = packet['ARP'].psrc
src_MAC = packet['Ether'].src
if src_MAC in self.IP_MAC_Map.keys():
if self.IP_MAC_Map[src_MAC] != src_IP :
try:
old_IP = self.IP_MAC_Map[src_MAC]
except:
old_IP = "Неизвестный"
message = f'''ARP атака замечена
Это возможно со стороны машины с IP адресом {old_IP} для {src_IP}'''
return message
else:
self.IP_MAC_Map[src_MAC] = src_IP
def sniffing(self):
# "Нюхаем" пакеты
sniff(count=0, filter="arp", store=0, prn=self.process_packet)
arpspoof_detector = ARPSpoofingDetector()
arpspoof_detector.sniffing()
Следующий шаг — сканер meterpreter сессий для Windows 7 и 10.
import re
import subprocess
from typing import List, Dict
WIN_7_SIGNATURE = ["WINBRAND.dll", "WINHTTP.dll", "webio.dll", "SspiCli.dll", "cscapi.dll"]
WIN_10_SIGNATURE = ["rsaenh.dll", "netapi32.dll", "wkscli.dll", "psapi.dll", "cscapi.dll"]
REG_FOR_WINDOWS_VERSION = r'(?:Windows\s+)(\d+|XP|\d+\.\d+)'
REG_FOR_EXE_PROCESSES = r'(?<=\\r\\n)[A-Za-z]+\.exe\s+\d+'
REG_FOR_LOCAL_SOCKET = r'(?:TCP\s+)(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})'
REG_FOR_REMOTE_SOCKET = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})\s+[A-Z]+\s+(\d+)'
CMD_NETSTAT_COMMAND = 'netstat -aon |find /i "established"'
CMD_TASKLIST_COMMAND = "tasklist /M"
class MeterpreterScanner:
def __init__(self):
self._signatures: List[str] = []
self._processes_with_signatures: List[str] = []
self._processes_with_dynamic_port: List[str] = []
self._suspicious_processes: Dict[str:List[str]] = {}
def _check_windows_version(self) -> None:
"""Проверка версии windows - 7 или 10"""
info = subprocess.check_output("systeminfo", shell=True)
win = re.findall(REG_FOR_WINDOWS_VERSION, str(info))
if win[0] == '10':
self._signatures = WIN_10_SIGNATURE
elif win[0] == '7':
self._signatures = WIN_7_SIGNATURE
else:
print("[X] Только Windows 7 или Windows 10")
def _search_process_with_dll(self, dll: str) -> None:
"""Поиск процессов с DLL библиотеками"""
output_tasklist = subprocess.check_output(f"{CMD_TASKLIST_COMMAND} {dll}", shell=True)
process_list = re.findall(REG_FOR_EXE_PROCESSES, str(output_tasklist))
for process_info in process_list:
process, process_PID = re.split(r'\s+', process_info)
if process in self._suspicious_processes:
self._suspicious_processes[f'{process}_{process_PID}'].append(dll)
else:
self._suspicious_processes[f'{process}_{process_PID}'] = [dll]
def _check_suspicious_process(self) -> None:
"""Сканируем подозрительные процессы"""
self._check_windows_version()
for dll in self._signatures:
self._search_process_with_dll(dll)
for proc_info in self._suspicious_processes.items():
proc_name, proc_dlls = proc_info
if len(proc_dlls) == 5:
print(f"[-] Найден подозрительный процесс : {proc_name}")
self._processes_with_signatures.append(proc_name)
if not self._processes_with_signatures:
print("[+] Meterpreter сигнатура не найдена в памяти")
def _scan_suspicious_ports(self) -> None:
"""Сканируем подозрительные порты"""
scan_output = subprocess.check_output(CMD_NETSTAT_COMMAND, shell=True)
local_sockets = re.findall(REG_FOR_LOCAL_SOCKET, str(scan_output))
for l_socket in local_sockets:
l_ip, l_port = l_socket.split(':')
if int(l_port) >= 49152:
if l_ip != "127.0.0.1":
victim_socket = f"{l_ip}:{l_port}"
data_with_suspicious_socket = scan_output.decode().split(victim_socket)
suspicious_info = re.findall(REG_FOR_REMOTE_SOCKET, data_with_suspicious_socket[1])[0]
suspicious_socket, suspicious_PID = suspicious_info
# порт 4444 используется по умолчанию в MSF и meterpreter
if int(suspicious_socket.split(':')[-1]) == 4444:
print(f"[!] Найдено MSF подключение: {suspicious_socket}")
print(f"[-] Подключение {victim_socket} к {suspicious_socket} "
f"использование динамический PID - {suspicious_PID}")
self._processes_with_dynamic_port.append(suspicious_PID)
def finding_meterpreter_sessions(self):
"""Находим сессии meterpreter"""
found = False
self._check_suspicious_process()
self._scan_suspicious_ports()
# ищем процесс
for proc in self._processes_with_signatures:
proc_name, proc_PID = proc.split('_')
if proc_PID in self._processes_with_dynamic_port:
print(f'[!] Найдено совпадение в {proc_name} с PID {proc_PID}')
found = True
if not found:
print(f"[+] Совпадений не найдено")
MeterpreterScanner().finding_meterpreter_sessions()
Теперь займемся сканером черных листов DNS, а точнее будем искать целевой IP в этих списках.
import socket
import re
from ipaddress import ip_network, ip_address
from dns import resolver
from requests import get, exceptions
def ip_in_range(ip, addr):
"""Сканирование IP"""
if ip_address(ip) in ip_network(addr):
return True
return False
def cloudfare_detect(ip):
"""Сканирование в Cloudfare"""
list_addr = ["104.16.0.0/12"]
url = 'https://www.cloudflare.com/ips-v4'
req = get(url=url)
for adr in req.text.split("\n"):
list_addr.append(adr)
for addr in list_addr:
detect = ip_in_range(ip, addr)
if detect:
return True
return False
def public_ip():
"""Получить публичный IP"""
try:
return get('https://api.ipify.org/').text
except exceptions.ConnectionError:
return '127.0.0.1'
def dns_bl_check(ip):
bad_dict = dict()
result = str()
req = get('https://raw.githubusercontent.com/evgenycc/DNSBL-list/main/DNSBL')
read = req.text.splitlines()
for serv in read:
print(f'Checking... {serv}')
req = f"{'.'.join(reversed(ip.split('.')))}.{serv.strip()}"
try:
resolv = resolver.Resolver()
resolv.timeout = 5
resolv.lifetime = 5
resp = resolv.resolve(req, 'A')
resp_txt = resolv.resolve(req, 'TXT')
result += f'{serv.strip():30}: [BAD]\n'
pattern = r'(?:https?:\/\/)?(?:[\w\.]+)\.(?:[a-z]{2,6}\.?)(?:\/[\w\.]*)*\/?'
find = re.findall(pattern, str(resp_txt[0]))
if len(find) == 0:
find = ['No address']
bad_dict.update({serv.strip(): f'{resp[0]} {find[0]}'})
except resolver.NXDOMAIN:
result += f'{serv.strip():30}: [OK]\n'
except (resolver.LifetimeTimeout, resolver.NoAnswer):
continue
except Exception as ex:
raise ex
return ('error', ex)
if len(bad_dict) > 0:
result += f'{ip} is found in black list\n'
for bad in bad_dict:
result += f' - {bad:30} : {bad_dict[bad]}\n'
else:
result += 'IP is not found in black list\n'
return (result, 0)
def check_ip_in_black_list(addr_input):
result = ''
result += f'Your public IP: {public_ip()}\n'
if addr_input.lower() == "x":
exit(0)
ip = ''
try:
ip = socket.gethostbyname(addr_input)
except socket.gaierror as ex:
return ex
if cloudfare_detect(ip):
result += f'Cloudfare detected: {ip}\n'
else:
result += 'Cloudfare not detected\n'
dnsbl_list = dns_bl_check(ip)
if dnsbl_list[0] is None or dnsbl_list[0] == 'error':
return result
else:
result += dnsbl_list[0]
return result
check_ip_in_black_list('<IP адрес>')
Теперь парсер ссылок:
import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup
internal_urls = set()
external_urls = set()
def is_valid(url):
"""Данная функция проверяет, валиден ли URL
Аргументы:
url - ссылка на страницу
"""
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
def get_all_website_links(url):
"""Получаем все ссылки с сайта"""
urls = set()
domain_name = urlparse(url).netloc
soup = BeautifulSoup(requests.get(url).content, "html.parser")
for a_tag in soup.findAll("a"):
href = a_tag.attrs.get("href")
if href == "" or href is None:
continue
href = urljoin(url, href)
parsed_href = urlparse(href)
href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path
if not is_valid(href):
continue
if href in internal_urls:
continue
if domain_name not in href:
if href not in external_urls:
print(f"[!] Внешняя ссылка: {href}")
external_urls.add(href)
continue
print(f"[*] Внутренняя ссылка: {href}")
urls.add(href)
internal_urls.add(href)
return urls
def crawl(url, max_urls=30):
"""Парсинг"""
total_urls_visited = 0
total_urls_visited += 1
print(f"[*] Работа над {url}")
links = get_all_website_links(url)
for link in links:
if total_urls_visited > max_urls:
break
crawl(link, max_urls=max_urls)
def get_links(url, max_urls=30):
"""Получаем ссылки"""
crawl(url, max_urls)
print("[+] Всего внутренних ссылок:", len(internal_urls))
print("[+] Всего внешних ссылок:", len(external_urls))
print("[+] Всего ссылок", len(external_urls) + len(internal_urls))
print("[+] Всего проверенных ссылок:", max_urls)
Как видите, ничего сложного.
А мы едем дальше — следующий шаг это сканер портов:
import socket
from datetime import datetime
def scan_ports(hostname):
start = datetime.now()
ports = {
20: "FTP-DATA", 21: "FTP", 22: "SSH", 23: "Telnet",
25: "SMTP", 43: "WHOIS", 53: "DNS", 80: "http",
115: "SFTP", 123: "NTP", 143: "IMAP", 161: "SNMP",
179: "BGP", 443: "HTTPS", 445: "MICROSOFT-DS",
514: "SYSLOG", 515: "PRINTER", 993: "IMAPS",
995: "POP3S", 1080: "SOCKS", 1194: "OpenVPN",
1433: "SQL Server", 1723: "PPTP", 3128: "HTTP",
3268: "LDAP", 3306: "MySQL", 3389: "RDP",
5432: "PostgreSQL", 5900: "VNC", 8080: "Tomcat", 10000: "Webmin"
}
open_ports = []
closed_ports = []
[i for i in range(1, 65355)]
ip = socket.gethostbyname(hostname)
for port in range(65535):
cont = socket.socket()
cont.settimeout(1)
# попытаемся подключиться
try:
cont.connect((ip, port))
except socket.error:
print(f'[!] Порт {port} закрыт')
closed_ports.append([port])
else:
# выводим тип порта
try:
print(f"[{socket.gethostbyname(ip)}:{str(port)}] открыт/{ports[port]}")
open_ports.append({port, ports[port]})
except KeyError:
print(f"[{socket.gethostbyname(ip)}:{str(port)}] открыт/неизвестный тип")
open_ports.append({port, 'неизвестный тип'})
finally:
cont.close()
ends = datetime.now()
print("<Время работы:{}>".format(ends - start))
print('[+] Закрытые порты: ')
for closed_port in closed_ports:
print(closed_port, end=', ')
print()
print('[+] Открытые порты: ')
for open_port in open_ports:
print(open_port)
scan_ports('<IP адрес>')
Следующий шаг — поиск SQL-Injection уязвимостей. Внедрение SQL-кода — один из распространённых способов взлома сайтов и программ, работающих с базами данных, основанный на внедрении в запрос произвольного SQL-кода.
import requests
from bs4 import BeautifulSoup as bs
from urllib.parse import urljoin
from pprint import pprint
def get_session(user_agent):
session = requests.Session()
session.headers["User-Agent"] = user_agent
return session
def get_all_forms(session, url):
"""Дается `url`, и это возвращаем весь html-контент с формами"""
soup = bs(session.get(url).content, "html.parser")
return soup.find_all("form")
def get_form_details(form):
"""
Эта функция получает всю доступную информацию о форме
"""
details = {}
# получаем действие формы (url цели)
try:
action = form.attrsession.get("action").lower()
except:
action = None
# получаем метод формы (POST, GET, etc.)
method = form.attrsession.get("method", "get").lower()
# получаем все детали ввода
inputs = []
for input_tag in form.find_all("input"):
input_type = input_tag.attrsession.get("type", "text")
input_name = input_tag.attrsession.get("name")
input_value = input_tag.attrsession.get("value", "")
inputsession.append({"type": input_type, "name": input_name, "value": input_value})
# добавляем информацию в словарь деталей
details["action"] = action
details["method"] = method
details["inputs"] = inputs
return details
def is_vulnerable(response):
"""Простая логическая функция, определяющая, является ли страница
уязвима ли SQL-инъекция из-за ее «ответа»"""
errors = {
# MySQL
"you have an error in your sql syntax;",
"warning: mysql",
# SQL Server
"unclosed quotation mark after the character string",
# Oracle
"quoted string not properly terminated",
}
for error in errors:
# Если мы находим ошибку, то возвращаем True
if error in response.content.decode().lower():
return True
# Нету ошибок
return False
def scan_sql_injection(session, url):
"""Сканирование URL на нахождение уяизвимостей для SQL-инъекции"""
for c in "\"'":
# Добавляем кавычку к URL
new_url = f"{url}{c}"
print("[!] Попытка нахождения уязвимости SQL-инъекции на", new_url)
# Создаем HTTP-запрос
res = session.get(new_url)
if is_vulnerable(res):
# SQL-инъекция обнаружена на самом URL-адресе,
# нет необходимости предварительно извлекать формы и отправлять их
print("[+] SQL инъекция найдена, ссылка:", new_url)
return
# Анализ форм
forms = get_all_forms(session, url)
print(f"[+] Найдено {len(forms)} форм на {url}.")
for form in forms:
form_details = get_form_details(form)
for c in "\"'":
# данные тела для отправки
data = {}
for input_tag in form_details["inputs"]:
if input_tag["type"] == "hidden" or input_tag["value"]:
# любая форма ввода, которая скрыта или имеет некоторое значение,
# просто используем его в теле формы
try:
data[input_tag["name"]] = input_tag["value"] + c
except:
pass
elif input_tag["type"] != "submit":
# все остальные, кроме отправки, используют некоторые ненужные данные со специальным символом
data[input_tag["name"]] = f"test{c}"
# добавляем к URL ссылку с действием из формы
url = urljoin(url, form_details["action"])
if form_details["method"] == "post":
res = session.post(url, data=data)
elif form_details["method"] == "get":
res = session.get(url, params=data)
if is_vulnerable(res):
print("[+] SQL Injection vulnerability detected, link:", url)
print("[+] Form:")
pprint(form_details)
break
def scanning(fua, url):
"""fua - юзер агент, url - ссылка на сайт"""
session = get_session(fua)
scan_sql_injection(session, url)
Займемся поиском XSS-уязвимостей (межсайтовый скриптинг). XSS — тип атаки на веб-системы, заключающийся во внедрении в выдаваемую веб-системой страницу вредоносного кода и взаимодействии этого кода с веб-сервером злоумышленника. Является разновидностью атаки «Внедрение кода». Википедия
import requests
from pprint import pprint
from bs4 import BeautifulSoup as bs
from urllib.parse import urljoin
def get_all_forms(url, fua):
"""Вводится `url`, это возвращает все формы из HTML-контента
fua - фейковый юзер агент"""
headers = {
'User-Agent': fua
}
soup = bs(requests.get(url, headers=headers).content, "html.parser")
return soup.find_all("form")
def get_form_details(form):
"""Данная функция получает все возможные данные из HTML-формы"""
details = {}
# Получаем действие формы
action = form.attrs.get("action", "").lower()
# Получаем все методы формы (POST, GET, etc.)
method = form.attrs.get("method", "get").lower()
# Получаем все инпуты
inputs = []
for input_tag in form.find_all("input"):
input_type = input_tag.attrs.get("type", "text")
input_name = input_tag.attrs.get("name")
inputs.append({"type": input_type, "name": input_name})
# Добавляем в детали
details["action"] = action
details["method"] = method
details["inputs"] = inputs
return details
def submit_form(form_details, url, value):
"""
Отправляем данные формы
Возвращает HTTP ответ
"""
target_url = urljoin(url, form_details["action"])
inputs = form_details["inputs"]
data = {}
for input in inputs:
# заменить весь текст и значения поиска на `value`
if input["type"] == "text" or input["type"] == "search":
input["value"] = value
input_name = input.get("name")
input_value = input.get("value")
if input_name and input_value:
data[input_name] = input_value
print(f"[+] Отправка вредоносной полезной нагрузки на {target_url}")
print(f"[+] Данные: {data}")
if form_details["method"] == "post":
return requests.post(target_url, data=data)
else:
# GET request
return requests.get(target_url, params=data)
def scan_xss(url, fua):
"""
Получив `url`, он выводит все формы, уязвимые для XSS, и
возвращает True, если кто-то из них уязвим, False в противном случае
"""
forms = get_all_forms(url, fua)
print(f"[+] Найдено {len(forms)} форм на {url}.")
js_script = "<Script>alert('Detected')</script>"
is_vulnerable = False
for form in forms:
form_details = get_form_details(form)
content = submit_form(form_details, url, js_script).content.decode()
if js_script in content:
print(f"[+] XSS найдена на {url}")
print("[*] Детали формы:")
pprint(form_details)
is_vulnerable = True
return is_vulnerable
if __name__ == "__main__":
url = "https://xss-game.appspot.com/level1/frame"
print(scan_xss(url, '<юзер агент>'))
Вот и все. Осталось это скомбинировать в одну программу на Python, считайте что это ваше домашнее задание. Попробуйте улучшить код, улучшить вывод сообщений и многое другое. А как насчет асинхронности? Обо всем этом мы будем говорить во второй части статьи — там мы будем красоту наводить, и пытаться оптимизировать.
Заключение
Итак, в этой статье мы смогли написать набор OSINT (да и не только OSINT) инструментов. Оставляйте ваши комментарии, критику. Я понимаю, что всем не угодить, и вы можете высказаться в комментариях.
Я хочу предупредить, что весь мой код был скоплен за несколько месяцев кодинга. Если вы обнаружили код, который был уже в другой статье, прошу оставить комментарий, я внесу источник в статью. Благодарю за понимание!
Больше Python кода, гайдов и полезных инструментов и гайдов вы можете найти в моем канале, заходите в гости.
Спасибо за внимание! Надеюсь вам понравилась статья!
Ссылки
-
GitHub-репозиторий. Внимание, он в разработке, поэтому пока не все работает, но я постоянно обновляю.
Комментарии (14)
vasyakolobok77
25.05.2024 13:02+3Ощущение, что код и статью делал школьник с помощью chat-gpt:
-- бессмысленный модуль для генерации юзерагента, который можно сгенерировать по шаблону кодом в одну строчку;
-- бессмысленные замены \n на \n;
-- топорная регулярка для MAC, навскидку более правильная и простая регулярка: [0-9a-f]{2}(:[0-9a-f]{2}){5}
-- для генерации мака тоже топорный алгоритм вместо того чтобы сгенерить 6 рандом байт и потом сделать toHex;-- топорное и неговорящее именование переменных / функций;
-- одновременное использование raise и return, а местами return error вместо raise;
-- сделана проверка на 127.0.0.1 но забыто, что вся подсеть 127/8 является исключением, и кроме нее есть еще десяток подсетей, которые имеют особое назначение;
вобщем идея может быть и интересная, но реализация на уровне "как не надо писать код"
DrArgentum
25.05.2024 13:02ChatGPT я никогда не использую для написания статей, а также для генерации кода. Максимум что я делал - это генерация регулярных выражений, ибо я в них ноль
return error я использовал потому что мне не требовалось исключение.
Спасибо за заметки, негативный опыт - тоже опыт
Grigory_Otrepyev
25.05.2024 13:02+3Для этого нам не нужны будут дополнительные библиотеки. Но вам возможно нужно будет установить набор инструментов
net-tools
. Этот набор содержит в себе утилиту ifconfig, аналог ipconfig в Windows, который нам нужен будет для управления сетями.OSINT framework focused on gathering information from free tools or resources.
1 Никакого отношения OSINT к замене MAC не имеет
2 Связка MAC-PORT приедет в ELK > SOC и последует звонок от СБ.{info_data.get("hostname")}
не получите, что за школотронство.
print(f'Name of service work on port 80: {serv_port.type_port(80)}')
и что ?
Займемся детектором ARP спуфинга. ARP-spoofing — разновидность сетевой атаки типа MITM, применяемая в сетях с использованием протокола ARP. В основном применяется в сетях Ethernet.
И где вы в 2024 другие видели ?
вы можете найти в моем канале,
а, понятно.
alfa41
25.05.2024 13:02+2Автор забыл упомянуть, что у Virustotal огромнейшая база исторических DNS записей, доступная по тому же API. Это "маст-хэв" для OSINT. Dnskron, к примеру, одна из реализаций такого скрипта, способного последовательно вытащить сотни связанных доменов через связь типа "domain-ip"
olegtsss
25.05.2024 13:02В комментариях под статьей налито много негатива. Не сочтите, за еще одну ложку дегтя, но я бы хотел обратить внимание на саму идею создания подобного рода комбайнов. С одной стороны, это удобно, все под рукой. С другой стороны, куча зависимостей от сторонних ресурсов, которые со временем будут изменяться. Создание подобного рода инструментов вынуждает всегда их поддерживать в актуальном состоянии и желательно следить за изменениями на стороне внешних ресурсов. Да и учесть у себя все имеющиеся возможности не выйдет. Итого имеем сборник "простейших техник". Следовательно встает вопрос, а стоит ли создавать такого рода комбайны. Мой ответ, к которому я пришел со временем - не стоит.
alfa41
25.05.2024 13:02Придерживаюсь того же мнения. Один скрипт под одну задачу. Но в то же время есть смысл обьединять по типу обьекта, к примеру, IP или домен, и собирать данные с разных API в рамках одного скрипта. Как показывает практика, разные источники могут дополнять друга.
elixirkmc
25.05.2024 13:02Не очень понятно, что дает подобный просмотр ARP пакетов, если подключиться в сеть, в которой уже активен спуфер. В зависимости от порядка получения пакетов, можно перепутать корректный ответ с подменным.
P.S. По коду: в обработчике пакета не указан никакой аргумент.
DrArgentum
А мне нравится, код вполне приятный, да и местами полезный.
andrezh
Так это же ваша статья и ваш код, судя по гитхабу AlexeevDeveloper.
Код, кстати, плохой.
DrArgentum
А расскажите пожалуйста почему код плохой? Я использовал анализатор ruff и пытаюсь соблюдать pep 8
А если вы про то, что сканеры xss уязвимостей и SQL инъекций слишком простые и банальные, то я с вами буду согласен
andrezh
Структура, стиль. Все плохо. Линтер навыков не добавляет. Советую использовать Copilot.
p.s. А разве на Хабре разрешено клоноводство для рекламы телеграм каналов?
DrArgentum
Я написал статью, но я не владелец телеграм канала
Grigory_Otrepyev
@2moderatorа у вас тут как обычно - мультиаккаунтом балуются.
DrArgentum
Я напомню, что аккаунт с которой была написана статья - не мой, а заказчика. У меня один аккаунт на Хабре - и это drargentum