Кобра: создаем OSINT инструмент на Python, часть 1


Итак, каждый программист желает все автоматизировать — и не только программист. В этой статье мы рассмотрим создание OSINT-инструмента на Python.


Я хочу предупредить, что все показано в целях ознакомления, данные человека не подлежат хранению или распространению без соответствующего разрешения.


В этой статье мы создадим инструмент для:


  • Получения информации об IP адресе
  • Получения информации об номере телефона
  • Получение информации, есть ли IP в черных листах DNS
  • Парсер всех ссылок с сайта
  • Сканер портов
  • SYN-сканер портов
  • Сканер сессий Meterpreter для Windows 7/10
  • Virus Total API
  • Сканер SQL инъекций
  • Сканер XSS уязвимостей
  • Генератор фейкового User-Agent
  • Скрипт для изменения mac-адреса

Эта статья — первая часть, здесь мы рассмотрим минимум, а в следующей части доделаем код, улучшим его и сделаем полноценный CLI-инструмент, всю информацию по обновлению кода и уведомления о новых частях буду публиковать у себя в канале.


Данная статья ориентирована на продвинутых разработчиков, которые уже умеют писать и читать python-код.


Для всего этого нам нужен будет Python >3.9, Linux и пакетный менеджер pip.


Весь исходный код доступен по ссылке. Репозиторий локализирован на английский язык, в директории /docs/ru есть markdown-версия статьи.


Создаем рабочее окружение


Никогда вам не советую устанавливать пакеты в саму систему. Для того чтобы работать над каждым проектом отдельно (дабы не возникало ошибок с пакетами и библиотеками) надо создавать виртуальные окружения.


Для того, чтобы создать виртуальное окружение, нам потребуется всего лишь одна команда:


python3 -m venv <название виртуального окружения>
# Например, создание виртуального окружения venv
python3 -m venv venv

После ввода данной команды у вас в текущем проекте будет создана новая директория — это и будет виртуальное окружение.


Но это еще не все — нам предстоит активировать его командой:


source venv/bin/activate
# Или, если у вас fish:
source venv/bin/activate.fish

И вот теперь мы можем спокойно работать с проектом и устанавливать пакеты. Для деактивации и выхода из окружения можно использовать команду deactivate.


Установка зависимостей


Для начала, давайте установим пакет whois. Он потребуется для того, чтобы мы могли узнать информацию об IP адресах.


sudo pacman -S whois            # Arch
sudo apt install whois          # Debian/Ubuntu
sudo dnf install whois          # Fedora
sudo epm install whois          # Alt

После активации нашего окружения, можно установить python-пакеты. Список нужных нам можно получить по ссылке в файле requirements.txt. Просто скачиваете его в проект и выполняете следующию команду:


pip3 install -r requirements.txt

Этим мы установили нужные нам зависимости. Теперь можно начать разработку!


Архитектура проекта


Давайте создадим архитектуру нашего инструмента:


  • Директория core — базовый функционал для самого инструмента
  • Директория modules — главная папка всего проекта, здесь мы будем хранить модули для работы.
    • anonymity — модуль анонимности, безопасности. Небольшой, чаще всего вспомогательный. Есть пока два компонента — генерация фейкового юзер-агента (fakeuseragent.py) и изменение MAC-адреса устройства (machanger.py)
    • network_base — вспомогательно-информационный модуль. Содержит информацию о сети, IP и портах. Один из самых больших модулей. Содержит следующие файлы: адрес из гео-координат, домен из IP, IP из домена, небольшие программы для работы с IPv4, ваш публичный IP, параметры сети, скрипт для пинга адреса, какой сервис на порту и т.д.
    • osint — модуль, как не странно, для OSINT и поиска информации. Здесь уже содержится прокси-файл для взаимодействия с модулем network_base, а также скрипты для получения информации об ip и номере телефона.
    • scanners — модуль для сканеров, как понятно из названия. Здесь есть сканер IP на наличие в черных списках DNS, парсер ссылок с сайта, сканер портов, поиск SQL-Injection и XSS уяизвимостей.
  • Файл main.py — главный файл кода в корневом каталоге проекта.

Итак, в репозитории директория core является хранилищем трех файлов — это


  • highlight_schemes.py — цветовые схемы Pygments.
  • logger.py — файл с классом логгера, дебаггера объектов.
  • style.py — файл с цветами, для того чтобы делать вывод красивее.

К сожалению, эти файлы мы не будем затрагивать — это совершенно другая тема. Если вы хотите, следующая статья будет об создании продвинутых CLI-программ на Python. А пока продолжим.


В репозитории modules есть огромное количество файлов и каталогов, я разберу здесь большую часть.


Создание модулей


Займемся созданием модулей. Начнем с модуля анонимности


Генерация фейкового User-Agent


Для этого мы будем использовать библиотеку fake-useragent.


from fake_useragent import UserAgent

def generate_useragent() -> str:
    fua = UserAgent()

    return str(fua.random)

Здесь всего лишь одна функция — генерации случайного юзер-агента.


Изменение MAC-адреса


Для этого нам не нужны будут дополнительные библиотеки. Но вам возможно нужно будет установить набор инструментов net-tools. Этот набор содержит в себе утилиту ifconfig, аналог ipconfig в Windows, который нам нужен будет для управления сетями.


import re
import subprocess
from random import choice

def ifconfig():
    """Возвращаем вывод команды ifconfig"""
    output = subprocess.check_output(["sudo", 'ifconfig'])

    return str(str(output.decode('utf-8')).replace(r'\n', '\n'))

def change_mac(interface: str, new_mac_address: str) -> None:
    """Функция для изменения MAC-адреса при помощи утилиты ifconfig

    Аргументы:
     + interface: str - название интерфейса (ex. wlan0, wlp1s0, wlp3s0, eth)
     + new_mac_address - значение нового mac-адреса"""
    print(f'[+] Выключение {interface}')
    subprocess.call(["doas", "ifconfig", interface, "down"])
    print(f'[+] Замена MAC-адреса {interface} на {new_mac_address}')
    subprocess.call(["doas", 'ifconfig', interface, 'hw', 'ether', new_mac_address])
    print(f'[+] Включение {interface}')
    subprocess.call(["doas", "ifconfig", interface, "up"])

def get_random_mac_address() -> str:
    """Функция генерации и получения случайного mac-адреса.
    У нас есть словарь с числами от 0 до 9, и латинских букв от a до f, а также начало
    нового mac-адреса (00). После мы проходимся в итерационном цикле и добавляем 5 раз еще 
    по паре символов, а после возвращаем

    Возвращает:
     + str - новый mac-адрес"""
    characters = list("1234567890abcdef")
    random_mac_address = "00"

    for i in range(5):
        random_mac_address += ':' + choice(characters) + choice(characters)

    return random_mac_address

def get_current_mac(interface: str):
    """Получаем текущий MAC-адрес при помощи утилиты ifconfig"""
    output = subprocess.check_output(["doas", "ifconfig", interface])

    return re.search(r"\w\w:\w\w:\w\w:\w\w:\w\w:\w\w", str(output)).group(0)

def main(interface: str):
    new_mac_address = get_random_mac_address()

    try:
        current_mac = get_current_mac(interface)
    except subprocess.CalledProcessError as ex:
        print(f'[!] Произошла ошибка. Скорее всего {interface} не существует')
        print(f' !-> {ex}')
        return

    print(f'[+] Текущий MAC-адрес: {current_mac}')
    print(f'[+] Замена MAC-адреса на {new_mac_address}')
    change_mac(interface, new_mac_address)

Модуль network_base


Этот модуль, как я уже говорил, нужен для работы с сетью. Но весь код занимает слишком много места, поэтому посмотреть вы его можете по ссылке.


Обязательно советую использовать этот код, ибо он требуется для получения информации об IP. Можете просто скопировать код и изучить его, он обильно документирован.


Там вы увидите множество файлов — все они пригодятся.


Модуль OSINT


Теперь займемся самим поиском информации по открытым источникам. Займемся файлом ip.py — он поведует нам некоторую информацию об IP адресе.


import os
import requests

def get_info_about_ip(ipaddr: str, fua: str) -> tuple:
    """Получаем информацию об IP адресе

    Аргументы:
     + ipaddr: str - IP адрес
     + fua: str - фейковый юзер-агент"""
    result = str()

    try:
        # Отправляем запрос
        headers = {
            'User-Agent': fua
        }
        info_data = requests.get(f'https://ipinfo.io/{ipaddr}/json', headers=headers).json()
    except Exception as ex:
        # При ошибке возвращаем саму ошибку.
        return ex

    # Получаем информацию об IP
    whois_info = os.popen(f'whois {ipaddr}').read().strip()

    result += f'IP: {info_data.get("ip")}\n'
    result += f'City: {info_data.get("city")}\n'
    result += f'Region: {info_data.get("region")}\n'
    result += f'Country: {info_data.get("country")}\n'
    result += f'Hostname: {info_data.get("hostname")}\n'
    result += f'JSON data: {info_data}\n'
    result += f'WhoIS: {whois_info}\n'

    # возвращаем результат
    return result

Следующий шаг — компонент netlib.py. Он как раз и будет неким посредником между модулем OSINT и модулем network_base.


import modules.network_base.ipv4_local_cli as ipv4_cli
import modules.network_base.ipv4_local_getmac as ipv4_gm
import modules.network_base.ipv4_local_sock as ipv4_sock

import modules.network_base.router_ip as getaway

import modules.network_base.network_params as net_param

import modules.network_base.ip_from_domain as ip_domain
import modules.network_base.domain_from_ip as domain_ip
import modules.network_base.service_on_port as serv_port

import modules.network_base.my_public_ip as public_ip
import modules.network_base.ping_address as ping_addr
import modules.network_base.geolocation_ip as geo_ip
import modules.network_base.addr_from_geo as addr_geo

def url_info(url):
    """Информация об URL.

    URL автоматически ретранслируется в IP"""

    # Пинг адреса
    print(f'Ping domain of address: {ping_addr.ping_addr(ip_domain.ip_from_domain(f"{url}"))}')
    # Координаты
    print(f"Coords by IP address: {geo_ip.geo_ip(domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}')))}")
    # Физический адрес по координатам
    print(f"Physical address by coords: "
        f"{addr_geo.get_addr(geo_ip.geo_ip(domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}'))))}")
    # IP из домена
    print(f"IP-address of domain {url}: {ip_domain.ip_from_domain(f'{url}')}")
    # Домен из IP
    print(f"Domain name {url} by IP: {domain_ip.domain_ip(ip_domain.ip_from_domain(f'{url}'))}")

def ip_info(ip: str) -> str:
    """Информация об IP"""
    print(f'Ping domain or address: {ping_addr.ping_addr(ip)}')
    print(f"Coords by IP: {geo_ip.geo_ip(ip)}")
    print(f"Geo Addr Coords by IP: {addr_geo.get_addr(geo_ip.geo_ip(domain_ip.domain_ip(ip)))}")
    print(f"Domain name: {domain_ip.domain_ip(ip)}")

def check_network():
    """Проверяем сетевые параметры"""

    # Локальный IP
    print(f'Local IP (cli): {ipv4_cli.local_ipv4()}')
    print(f'Local IP (gm): {ipv4_gm.local_ipv4()}')
    print(f'Local IP (sock): {ipv4_sock.local_ipv4()}')

    # IP шлюза
    print(f'IP router: {getaway.router_ip()}')

    # Параметры сети
    print(f'Network interface parameters:\n{net_param.network_param()}')

    # Название сервиса работающего на 80 порту
    print(f'Name of service work on port 80: {serv_port.type_port(80)}')

    # Публичный IP
    print(f'Public IP: {public_ip.public_ip()}')

Следующий компонент — phone.py, который отвечает за небольшой поиск информации об номере телефона.


import requests

def get_info_phonenumber(phonenumber, fua):
    """Получение информации о номере телефона

    Аргументы:
     + phonenumber - номер телефона
     + fua - фейковый юзер-агент"""
    result = ''
    try:
        # Делаем запрос к API
        url = f"https://htmlweb.ru/geo/api.php?json&telcod={phonenumber}"
        headers = {
            'User-Agent': fua
        }
        info_data = requests.get(url, headers=headers).json()
    except Exception as ex:
        return ex

    result += f'Country: {info_data["country"]["name"]}\n'
    result += f'Region: {info_data["region"]["name"]}\n'
    result += f'Subregion: {info_data["region"]["okrug"]}\n'
    result += f'Operator: {info_data["0"]["oper"]}\n'
    result += f'Location: {info_data["country"]["location"]}\n'

    return result

После этого можно заняться следующим компонентом — whois_information.py:


#!venv/bin/python3
import socket
import time
from ipaddress import IPv4Address, AddressValueError
from datetime import datetime
import ipwhois
import whois

def ipwhois_info(ip: str):
    """Информация о IP по IPWhois

     + ip: str - IP адрес"""
    results = ipwhois.IPWhois(ip).lookup_whois()
    print(results)
    print("\n")

def whois_info(ip):
    """Информация о IP по WhoIs

     + ip: str - IP адрес"""
    results = whois.whois(ip)
    print(results)

def ianna(ip):
    """Информация о IP через whois.iana.org

     + ip: str - IP адрес"""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(("whois.iana.org", 43))
    s.send((ip + "\r\n").encode())
    response = b""

    while True:
        data = s.recv(4096)
        response += data
        if not data:
            break

    s.close()
    whois = ''

    for resp in response.decode().splitlines():
        if resp.startswith('%') or not resp.strip():
            continue
        elif resp.startswith('whois'):
            whois = resp.split(":")[1].strip()
            break

    return whois if whois else False

def get_whois(ip, whois):
    """Получения информации о IP"""
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect((whois, 43))
    s.send((ip + "\r\n").encode())
    response = b""

    while True:
        data = s.recv(4096)
        response += data
        if not data:
            break

    s.close()
    whois_ip = dict()
    num = 0

    for ln in response.decode().splitlines():
        if ln.strip().startswith("%") or not ln.strip():
            continue
        else:
            if ln.strip().split(": ")[0].strip() in ['created', 'last-modified']:
                dt = datetime.fromisoformat(ln.strip().split(": ")[1].strip()).strftime("%Y-%m-%d %H:%M:%S")
                whois_ip.update({f'{ln.strip().split(": ")[0].strip()}_{num}': dt})
                num += 1
            else:
                whois_ip.update({ln.strip().split(": ")[0].strip(): ln.strip().split(": ")[1].strip()})

    return whois_ip if whois_ip else False

def validate_request(ip):
    """Проверка IP

     + ip - IP адрес"""
    try:
        IPv4Address(ip)
        if whois := ianna(ip):
            time.sleep(1)
            if info := get_whois(ip, whois):
                print(info)
            else:
                print("Не была получена информация")
        else:
            if info := get_whois(ip, 'whois.ripe.net'):
                print(info)
            else:
                print("Не была получена информация")
    except AddressValueError:
        print("IP адрес не валидный")
    except ConnectionResetError as ex:
        print(ex)

После этого давайте займемся предпоследним компонентом модуля OSINT — virus_total.py для анализа бинарников на вирусы, трояны и другие малвари.


Для начала вам потребуется авторизоваться на сайте Virus Total. После активации аккаунта, откройте меню пользователя и перейдите в секцию API Key. Ниже вы увидете сам API ключ:



Не забудьте про ограничения бесплатного API-ключа:


  • Частота запросов — 4 поиска / минута
  • Дневная квота — 500 поисков / день
  • Месячная квота — 15.5 K поисков / месяц

После просто скопируйте и вставьте в код. Есть разные практики хранить важные ключи в коде. Давайте рассмотрим три способа:


  1. Первый и простой способ. Просто вставить ключ прямо в коде

APIKEY='123456abcdef'

# Здесь ваш код
# ...

Но я категорически не советую так делать. Если вы забудете удалить ваш ключ, то он может оказаться в руках других людей. Они смогут получить доступ к вашему проекту, боту, аккаунта и т.д., в зависимости от возможностей API. Поэтому так делайте только в локальных проектах (да и даже так лучше не делать по стилю кода).


  1. Создать файл настроек/конфигов, например settings.py и в нем хранить все важные ключи, а после импортировать в проект. Намного лучше чем 1 способ, но проблемы с безопасностью также остаются.


  2. Переменные окружения. Безопасный способ, мы храним API не в коде, а в переменных окружения. Этот способ мы и будем использовать.



Вам будет надо установить пакет python-dotenv:


pip install python-dotenv

И после создайте в корневом каталоге файл .env, и в него вставьте следующий код:


VIRUS_TOTAL_KEY = "ваш ключ"

После этого давайте немного изменим концепцию, и создадим микро-антивирус. Он будет сперва проверять хеш файла в базе сигнатурах. Да, просто сопоставлять, и это не будет стоять рядом даже с самыми плохими антивирусами — ведь стоит добавить в бинарник малваря хотя-бы один байт, даже если это будет 0, то уже наш антивирус ничего не заподозрит. Но если малварь и пройдет сигнатурный анализ, то дальше его ждет Virus Total!


И еще, мы не будем сами посылать запросы, и парсить результат, мы будем использовать библиотеку vt-py.


А также для локальной проверки файлов нам нужны будут сигнатуры. Собрал я их в каталоге res, доступный по ссылке.


import os
import json
import requests
from dotenv import load_dotenv
import os
from time import perf_counter
from hashlib import sha256, sha1, md5
from colorama import Fore, Style
import vt
import json

# загружаем переменные окружения
load_dotenv()

# получаем API-ключ
VIRUS_TOTAL_API_KEY = os.getenv("VIRUS_TOTAL_KEY")

# Сигнатуры
signature_resources = ['res/signatures.txt', 'res/signatures2.txt',
                        'res/signatures3.txt', 'res/signatures4.txt']

# для своих требований, туда можно поместить хэши зловредов, которых нету в обычных списках сигнатур. А также описание и название.
signature_resource_info = 'res/signatures_info.json'

def add_signature(new_signature: dict) -> None:
    """Добавление сигнатуры в базу"""
    with open(signature_resource_info, 'r') as file:
        signatures_info = json.load(file)

    for signature in new_signatures:
        signatures_info['name'] = new_signatures['name']
        signatures_info['desc'] = new_signatures['desc']
        signatures_info['date'] = new_signatures['date']

    with open(signature_resource_info, 'a') as file:
        json.dump(signatures_info, file, indent=4)

    return None

def rewrite_signatures(signatures: dict) -> bool:
    """Перезапись БД сигнатур"""
    try:
        with open(signature_resource_info, 'w') as file:
            json.dump(signatures, file, indent=4)
    except Exception as e:
        return False
    else:
        return True

def get_info_signature(signature: str) -> str:
    """Получение информации об сигнатуре из БД"""
    with open(signature_resource_info, 'r') as file:
        signatures_info = json.load(file)

    try:
        signature_info = f'''Сигнатура: {signature}
Название угрозы: {signatures_info[signature]["name"]}
Описание угрозы: {signatures_info[signature]["desc"]}
Дата обнаружения угроза: {signatures_info[signature]["date"]}'''
    except IndexError:
        signature_info = f'По сигнатуре {signature} еще нету информации в наших базах данных'
    finally:
        return signature_info

def scan_file(filename: str, delete_file: bool, signature_resources: list, VT_API_KEY: str) -> None:
    """Сканируем файл

     + filename: str - путь до файла
     + delete_file: bool - удалять ли файл
     + signature_resources: list - список ресурсов сигнатур
     + VT_API_KEY: str - API ключ Virus Total"""
    result = f'Сканируем {filename} на наличие угроз...\n'

    try:
        print('Load singatures...')
        start = perf_counter()
        shahash = sha256()
        sha1hash = sha1()
        md5hash = md5()

        # Получаем хэши
        with open(filename, 'rb') as file:
            while True:
                data = file.read()
                if not data:
                    break
                shahash.update(data) # sha256
                sha1hash.update(data) # sha1
                md5hash.update(data) #md5

            result1 = shahash.hexdigest()
            result2 = sha1hash.hexdigest()
            result3 = md5hash.hexdigest()
            result += f'Проверяем наличие хешей "{result1}", "{result2}", "{result3}" (sha256, sha1, md5) в сигнатурах...\n'

        # Читаем сигнатуры
        with open(signature_resources[0], 'r') as r:
            signatures = list(r.read().split('\n'))

        with open(signature_resources[1], 'r') as r:
            for sign_hash2 in list(r.read().replace(';', '').split('\n')):
                signatures.append(sign_hash2)

        with open(signature_resources[2], 'r') as r:
            for sign_hash3 in list(r.read().replace(';', '').split('\n')):
                signatures.append(sign_hash3)

        with open(signature_resources[3], 'r') as r:
            for sign_hash4 in list(r.read().replace(';', '').split('\n')):
                signatures.append(sign_hash4)

        result += f'''Сканирование {filename} на Virus Total...\n'''

        print('Loading Virus Total...')
        client = vt.Client(VT_API_KEY)

        # анализ файла
        with open(filename, "rb") as f:
            analysis = client.scan_file(f, wait_for_completion=True)
            file = client.get_object(f"/files/{result1}")
            stats = file.last_analysis_stats

            result += f'VirusTotal: {filename}\t size: {file.size} bytes\n'
            result += f'Безвредный: {stats["harmless"]}\n'
            result += f'Неподдерживаемый тип: {stats["type-unsupported"]}\n'
            result += f'Подозрительный: {stats["suspicious"]}\n'
            result += f'Отказ: {stats["failure"]}\n'
            result += f'Злонамеренный: {stats["malicious"]}\n'
            result += f'Безопасный: {stats["undetected"]}\n'

        print('Get final info...')

        if result1 in signatures or result2 in signatures or result3 in signatures:
            end = perf_counter()
            total = end - start
            result += f'Найдена угроза в файле {filename} (поиск по сигнатурам)\n'

            # получаем информацию о файле, если он есть в сигнатурах
            if result in signatures:
                info = get_info_signature(result)
                result += f'Информация: {info}\n'
            elif result2 in signatures:
                info = get_info_signature(result2)
                result += f'Информация: {info}\n'
            elif result3 in signatures:
                info = get_info_signature(result3)
                result += f'Информация: {info}\n'

            # удаление файла
            if delete_file:
                os.remove(filename)
                result += f'[!] Файл {filename} удален!\n'

            result += f'Время работы: {(total):.07f}s\n'
        else:
            end = perf_counter()
            total = end - start
            result += 'Угроз не найдено\n'
            result += f'Время работы: {(total):.07f}s\n'
    except FileNotFoundError:
        result += f'[!] Файл не найден\n'
    except PermissionError:
        result += f'[!] Ошибка прав доступа к файлу\n'

    print('End')

    return result

res = scan_file('<любой файл>', False, signature_resources, VIRUS_TOTAL_API_KEY)
print(res)

Вот и все — при запуске этого скрипта мы можем спокойно анализировать файлы на Virus Total.


Модуль scanners


Модуль всеразличных сканеров и парсеров. ПОка есть 6 — поиск IP в черных списках DNS, парсер ссылок, сканер портов, сканер на SQL-Injection и XSS уязвимости. А также есть еще детектор ARP-спуфинга и сканер сессий meterpreter для Windows.


Займемся детектором ARP спуфинга. ARP-spoofing — разновидность сетевой атаки типа MITM, применяемая в сетях с использованием протокола ARP. В основном применяется в сетях Ethernet. Атака основана на недостатках протокола ARP.


Нам нужен будет модуль scapy.


from scapy.all import sniff

class ARPSpoofingDetector:
    def __init__(self):
        self.IP_MAC_Map = {}

    def process_packet(self):
        src_IP = packet['ARP'].psrc
        src_MAC = packet['Ether'].src

        if src_MAC in self.IP_MAC_Map.keys():
            if self.IP_MAC_Map[src_MAC] != src_IP :
                try:
                    old_IP = self.IP_MAC_Map[src_MAC]
                except:
                    old_IP = "Неизвестный"

                message = f'''ARP атака замечена
Это возможно со стороны машины с IP адресом {old_IP} для {src_IP}'''

                return message
            else:
                self.IP_MAC_Map[src_MAC] = src_IP

    def sniffing(self):
        # "Нюхаем" пакеты
        sniff(count=0, filter="arp", store=0, prn=self.process_packet)

arpspoof_detector = ARPSpoofingDetector()
arpspoof_detector.sniffing()

Следующий шаг — сканер meterpreter сессий для Windows 7 и 10.


import re
import subprocess
from typing import List, Dict

WIN_7_SIGNATURE = ["WINBRAND.dll", "WINHTTP.dll", "webio.dll", "SspiCli.dll", "cscapi.dll"]
WIN_10_SIGNATURE = ["rsaenh.dll", "netapi32.dll", "wkscli.dll", "psapi.dll", "cscapi.dll"]

REG_FOR_WINDOWS_VERSION = r'(?:Windows\s+)(\d+|XP|\d+\.\d+)'
REG_FOR_EXE_PROCESSES = r'(?<=\\r\\n)[A-Za-z]+\.exe\s+\d+'
REG_FOR_LOCAL_SOCKET = r'(?:TCP\s+)(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})'
REG_FOR_REMOTE_SOCKET = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5})\s+[A-Z]+\s+(\d+)'

CMD_NETSTAT_COMMAND = 'netstat -aon |find /i "established"'
CMD_TASKLIST_COMMAND = "tasklist /M"

class MeterpreterScanner:
    def __init__(self):
        self._signatures: List[str] = []
        self._processes_with_signatures: List[str] = []
        self._processes_with_dynamic_port: List[str] = []
        self._suspicious_processes: Dict[str:List[str]] = {}

    def _check_windows_version(self) -> None:
        """Проверка версии windows - 7 или 10"""
        info = subprocess.check_output("systeminfo", shell=True)
        win = re.findall(REG_FOR_WINDOWS_VERSION, str(info))

        if win[0] == '10':
            self._signatures = WIN_10_SIGNATURE
        elif win[0] == '7':
            self._signatures = WIN_7_SIGNATURE
        else:
            print("[X] Только Windows 7 или Windows 10")

    def _search_process_with_dll(self, dll: str) -> None:
        """Поиск процессов с DLL библиотеками"""
        output_tasklist = subprocess.check_output(f"{CMD_TASKLIST_COMMAND} {dll}", shell=True)
        process_list = re.findall(REG_FOR_EXE_PROCESSES, str(output_tasklist))

        for process_info in process_list:
            process, process_PID = re.split(r'\s+', process_info)
            if process in self._suspicious_processes:
                self._suspicious_processes[f'{process}_{process_PID}'].append(dll)
            else:
                self._suspicious_processes[f'{process}_{process_PID}'] = [dll]

    def _check_suspicious_process(self) -> None:
        """Сканируем подозрительные процессы"""
        self._check_windows_version()

        for dll in self._signatures:
            self._search_process_with_dll(dll)
        for proc_info in self._suspicious_processes.items():
            proc_name, proc_dlls = proc_info
            if len(proc_dlls) == 5:
                print(f"[-] Найден подозрительный процесс : {proc_name}")
                self._processes_with_signatures.append(proc_name)
        if not self._processes_with_signatures:
            print("[+] Meterpreter сигнатура не найдена в памяти")

    def _scan_suspicious_ports(self) -> None:
        """Сканируем подозрительные порты"""
        scan_output = subprocess.check_output(CMD_NETSTAT_COMMAND, shell=True)
        local_sockets = re.findall(REG_FOR_LOCAL_SOCKET, str(scan_output))
        for l_socket in local_sockets:
            l_ip, l_port = l_socket.split(':')
            if int(l_port) >= 49152:
                if l_ip != "127.0.0.1":
                    victim_socket = f"{l_ip}:{l_port}"
                    data_with_suspicious_socket = scan_output.decode().split(victim_socket)
                    suspicious_info = re.findall(REG_FOR_REMOTE_SOCKET, data_with_suspicious_socket[1])[0]
                    suspicious_socket, suspicious_PID = suspicious_info
                    # порт 4444 используется по умолчанию в MSF и meterpreter
                    if int(suspicious_socket.split(':')[-1]) == 4444:
                        print(f"[!] Найдено MSF подключение: {suspicious_socket}")
                    print(f"[-] Подключение {victim_socket} к {suspicious_socket} "
                          f"использование динамический PID - {suspicious_PID}")
                    self._processes_with_dynamic_port.append(suspicious_PID)

    def finding_meterpreter_sessions(self):
        """Находим сессии meterpreter"""
        found = False
        self._check_suspicious_process()
        self._scan_suspicious_ports()

        # ищем процесс
        for proc in self._processes_with_signatures:
            proc_name, proc_PID = proc.split('_')
            if proc_PID in self._processes_with_dynamic_port:
                print(f'[!] Найдено совпадение в {proc_name} с PID {proc_PID}')
                found = True

        if not found:
            print(f"[+] Совпадений не найдено")

MeterpreterScanner().finding_meterpreter_sessions()

Теперь займемся сканером черных листов DNS, а точнее будем искать целевой IP в этих списках.


import socket
import re
from ipaddress import ip_network, ip_address
from dns import resolver
from requests import get, exceptions

def ip_in_range(ip, addr):
    """Сканирование IP"""
    if ip_address(ip) in ip_network(addr):
        return True
    return False

def cloudfare_detect(ip):
    """Сканирование в Cloudfare"""
    list_addr = ["104.16.0.0/12"]

    url = 'https://www.cloudflare.com/ips-v4'
    req = get(url=url)

    for adr in req.text.split("\n"):
        list_addr.append(adr)

    for addr in list_addr:
        detect = ip_in_range(ip, addr)
        if detect:
            return True
    return False

def public_ip():
    """Получить публичный IP"""
    try:
        return get('https://api.ipify.org/').text
    except exceptions.ConnectionError:
        return '127.0.0.1'

def dns_bl_check(ip):
    bad_dict = dict()
    result = str()
    req = get('https://raw.githubusercontent.com/evgenycc/DNSBL-list/main/DNSBL')
    read = req.text.splitlines()

    for serv in read:
        print(f'Checking... {serv}')
        req = f"{'.'.join(reversed(ip.split('.')))}.{serv.strip()}"
        try:
            resolv = resolver.Resolver()
            resolv.timeout = 5
            resolv.lifetime = 5
            resp = resolv.resolve(req, 'A')
            resp_txt = resolv.resolve(req, 'TXT')
            result += f'{serv.strip():30}: [BAD]\n'
            pattern = r'(?:https?:\/\/)?(?:[\w\.]+)\.(?:[a-z]{2,6}\.?)(?:\/[\w\.]*)*\/?'
            find = re.findall(pattern, str(resp_txt[0]))
            if len(find) == 0:
                find = ['No address']
            bad_dict.update({serv.strip(): f'{resp[0]} {find[0]}'})
        except resolver.NXDOMAIN:
            result += f'{serv.strip():30}: [OK]\n'
        except (resolver.LifetimeTimeout, resolver.NoAnswer):
            continue
        except Exception as ex:
            raise ex
            return ('error', ex)

    if len(bad_dict) > 0:
        result += f'{ip} is found in black list\n'

        for bad in bad_dict:
            result += f' - {bad:30} : {bad_dict[bad]}\n'
    else:
        result += 'IP is not found in black list\n'

    return (result, 0)

def check_ip_in_black_list(addr_input):
    result = ''
    result += f'Your public IP: {public_ip()}\n'

    if addr_input.lower() == "x":
        exit(0)

    ip = ''

    try:
        ip = socket.gethostbyname(addr_input)
    except socket.gaierror as ex:
        return ex

    if cloudfare_detect(ip):
        result += f'Cloudfare detected: {ip}\n'
    else:
        result += 'Cloudfare not detected\n'

    dnsbl_list = dns_bl_check(ip)

    if dnsbl_list[0] is None or dnsbl_list[0] == 'error':
        return result
    else:
        result += dnsbl_list[0]

    return result

check_ip_in_black_list('<IP адрес>')

Теперь парсер ссылок:


import requests
from urllib.parse import urlparse, urljoin
from bs4 import BeautifulSoup

internal_urls = set()
external_urls = set()

def is_valid(url):
    """Данная функция проверяет, валиден ли URL

    Аргументы:
        url - ссылка на страницу
    """
    parsed = urlparse(url)

    return bool(parsed.netloc) and bool(parsed.scheme)

def get_all_website_links(url):
    """Получаем все ссылки с сайта"""
    urls = set()
    domain_name = urlparse(url).netloc
    soup = BeautifulSoup(requests.get(url).content, "html.parser")

    for a_tag in soup.findAll("a"):
        href = a_tag.attrs.get("href")

        if href == "" or href is None:
            continue

        href = urljoin(url, href)

        parsed_href = urlparse(href)
        href = parsed_href.scheme + "://" + parsed_href.netloc + parsed_href.path

        if not is_valid(href):
            continue

        if href in internal_urls:
            continue

        if domain_name not in href:
            if href not in external_urls:
                print(f"[!] Внешняя ссылка: {href}")
                external_urls.add(href)
            continue

        print(f"[*] Внутренняя ссылка: {href}")
        urls.add(href)
        internal_urls.add(href)

    return urls

def crawl(url, max_urls=30):
    """Парсинг"""
    total_urls_visited = 0

    total_urls_visited += 1
    print(f"[*] Работа над {url}")
    links = get_all_website_links(url)

    for link in links:
        if total_urls_visited > max_urls:
            break
        crawl(link, max_urls=max_urls)

def get_links(url, max_urls=30):
    """Получаем ссылки"""
    crawl(url, max_urls)
    print("[+] Всего внутренних ссылок:", len(internal_urls))
    print("[+] Всего внешних ссылок:", len(external_urls))
    print("[+] Всего ссылок", len(external_urls) + len(internal_urls))
    print("[+] Всего проверенных ссылок:", max_urls)

Как видите, ничего сложного.


А мы едем дальше — следующий шаг это сканер портов:


import socket
from datetime import datetime

def scan_ports(hostname):
    start = datetime.now()

    ports = {
        20: "FTP-DATA", 21: "FTP", 22: "SSH", 23: "Telnet",
        25: "SMTP", 43: "WHOIS", 53: "DNS", 80: "http",
        115: "SFTP", 123: "NTP", 143: "IMAP", 161: "SNMP",
        179: "BGP", 443: "HTTPS", 445: "MICROSOFT-DS",
        514: "SYSLOG", 515: "PRINTER", 993: "IMAPS",
        995: "POP3S", 1080: "SOCKS", 1194: "OpenVPN",
        1433: "SQL Server", 1723: "PPTP", 3128: "HTTP",
        3268: "LDAP", 3306: "MySQL", 3389: "RDP",
        5432: "PostgreSQL", 5900: "VNC", 8080: "Tomcat", 10000: "Webmin"
    }

    open_ports = []
    closed_ports = []
    [i for i in range(1, 65355)]

    ip = socket.gethostbyname(hostname)

    for port in range(65535):
        cont = socket.socket()
        cont.settimeout(1)

        # попытаемся подключиться
        try:
            cont.connect((ip, port))
        except socket.error:
            print(f'[!] Порт {port} закрыт')
            closed_ports.append([port])
        else:
            # выводим тип порта
            try:
                print(f"[{socket.gethostbyname(ip)}:{str(port)}] открыт/{ports[port]}")
                open_ports.append({port, ports[port]})
            except KeyError:
                print(f"[{socket.gethostbyname(ip)}:{str(port)}] открыт/неизвестный тип")
                open_ports.append({port, 'неизвестный тип'})
            finally:
                cont.close()

    ends = datetime.now()
    print("<Время работы:{}>".format(ends - start))

    print('[+] Закрытые порты: ')
    for closed_port in closed_ports:
        print(closed_port, end=', ')

    print()

    print('[+] Открытые порты: ')
    for open_port in open_ports:
        print(open_port)

scan_ports('<IP адрес>')

Следующий шаг — поиск SQL-Injection уязвимостей. Внедрение SQL-кода — один из распространённых способов взлома сайтов и программ, работающих с базами данных, основанный на внедрении в запрос произвольного SQL-кода.


import requests
from bs4 import BeautifulSoup as bs
from urllib.parse import urljoin
from pprint import pprint

def get_session(user_agent):
    session = requests.Session()
    session.headers["User-Agent"] = user_agent

    return session

def get_all_forms(session, url):
    """Дается `url`, и это возвращаем весь html-контент с формами"""
    soup = bs(session.get(url).content, "html.parser")

    return soup.find_all("form")

def get_form_details(form):
    """
    Эта функция получает всю доступную информацию о форме
    """
    details = {}

    # получаем действие формы (url цели)
    try:
        action = form.attrsession.get("action").lower()
    except:
        action = None

    # получаем метод формы (POST, GET, etc.)
    method = form.attrsession.get("method", "get").lower()

    # получаем все детали ввода
    inputs = []

    for input_tag in form.find_all("input"):
        input_type = input_tag.attrsession.get("type", "text")
        input_name = input_tag.attrsession.get("name")
        input_value = input_tag.attrsession.get("value", "")
        inputsession.append({"type": input_type, "name": input_name, "value": input_value})

    # добавляем информацию в словарь деталей
    details["action"] = action
    details["method"] = method
    details["inputs"] = inputs

    return details

def is_vulnerable(response):
    """Простая логическая функция, определяющая, является ли страница
    уязвима ли SQL-инъекция из-за ее «ответа»"""

    errors = {
        # MySQL
        "you have an error in your sql syntax;",
        "warning: mysql",
        # SQL Server
        "unclosed quotation mark after the character string",
        # Oracle
        "quoted string not properly terminated",
    }

    for error in errors:
        # Если мы находим ошибку, то возвращаем True
        if error in response.content.decode().lower():
            return True

    # Нету ошибок
    return False

def scan_sql_injection(session, url):
    """Сканирование URL на нахождение уяизвимостей для SQL-инъекции"""

    for c in "\"'":
        # Добавляем кавычку к URL
        new_url = f"{url}{c}"
        print("[!] Попытка нахождения уязвимости SQL-инъекции на", new_url)
        # Создаем HTTP-запрос
        res = session.get(new_url)
        if is_vulnerable(res):
            # SQL-инъекция обнаружена на самом URL-адресе, 
            # нет необходимости предварительно извлекать формы и отправлять их
            print("[+] SQL инъекция найдена, ссылка:", new_url)
            return

    # Анализ форм
    forms = get_all_forms(session, url)
    print(f"[+] Найдено {len(forms)} форм на {url}.")

    for form in forms:
        form_details = get_form_details(form)
        for c in "\"'":
            # данные тела для отправки
            data = {}
            for input_tag in form_details["inputs"]:
                if input_tag["type"] == "hidden" or input_tag["value"]:
                    # любая форма ввода, которая скрыта или имеет некоторое значение,
                    # просто используем его в теле формы
                    try:
                        data[input_tag["name"]] = input_tag["value"] + c
                    except:
                        pass
                elif input_tag["type"] != "submit":
                    # все остальные, кроме отправки, используют некоторые ненужные данные со специальным символом
                    data[input_tag["name"]] = f"test{c}"

            # добавляем к URL ссылку с действием из формы
            url = urljoin(url, form_details["action"])

            if form_details["method"] == "post":
                res = session.post(url, data=data)
            elif form_details["method"] == "get":
                res = session.get(url, params=data)

            if is_vulnerable(res):
                print("[+] SQL Injection vulnerability detected, link:", url)
                print("[+] Form:")
                pprint(form_details)
                break

def scanning(fua, url):
    """fua - юзер агент, url - ссылка на сайт"""
    session = get_session(fua)
    scan_sql_injection(session, url)

Займемся поиском XSS-уязвимостей (межсайтовый скриптинг). XSS — тип атаки на веб-системы, заключающийся во внедрении в выдаваемую веб-системой страницу вредоносного кода и взаимодействии этого кода с веб-сервером злоумышленника. Является разновидностью атаки «Внедрение кода». Википедия


import requests
from pprint import pprint
from bs4 import BeautifulSoup as bs
from urllib.parse import urljoin

def get_all_forms(url, fua):
    """Вводится `url`, это возвращает все формы из HTML-контента
    fua - фейковый юзер агент"""
    headers = {
        'User-Agent': fua
    }

    soup = bs(requests.get(url, headers=headers).content, "html.parser")
    return soup.find_all("form")

def get_form_details(form):
    """Данная функция получает все возможные данные из HTML-формы"""
    details = {}
    # Получаем действие формы
    action = form.attrs.get("action", "").lower()

    # Получаем все методы формы (POST, GET, etc.)
    method = form.attrs.get("method", "get").lower()

    # Получаем все инпуты
    inputs = []

    for input_tag in form.find_all("input"):
        input_type = input_tag.attrs.get("type", "text")
        input_name = input_tag.attrs.get("name")
        inputs.append({"type": input_type, "name": input_name})

    # Добавляем в детали
    details["action"] = action
    details["method"] = method
    details["inputs"] = inputs

    return details

def submit_form(form_details, url, value):
    """
    Отправляем данные формы
    Возвращает HTTP ответ
    """
    target_url = urljoin(url, form_details["action"])
    inputs = form_details["inputs"]
    data = {}

    for input in inputs:
        # заменить весь текст и значения поиска на `value`
        if input["type"] == "text" or input["type"] == "search":
            input["value"] = value

        input_name = input.get("name")
        input_value = input.get("value")

        if input_name and input_value:
            data[input_name] = input_value

    print(f"[+] Отправка вредоносной полезной нагрузки на {target_url}")
    print(f"[+] Данные: {data}")

    if form_details["method"] == "post":
        return requests.post(target_url, data=data)
    else:
        # GET request
        return requests.get(target_url, params=data)

def scan_xss(url, fua):
    """
    Получив `url`, он выводит все формы, уязвимые для XSS, и
    возвращает True, если кто-то из них уязвим, False в противном случае
    """
    forms = get_all_forms(url, fua)

    print(f"[+] Найдено {len(forms)} форм на {url}.")

    js_script = "<Script>alert('Detected')</script>"
    is_vulnerable = False

    for form in forms:
        form_details = get_form_details(form)
        content = submit_form(form_details, url, js_script).content.decode()
        if js_script in content:
            print(f"[+] XSS найдена на {url}")
            print("[*] Детали формы:")
            pprint(form_details)
            is_vulnerable = True

    return is_vulnerable

if __name__ == "__main__":
    url = "https://xss-game.appspot.com/level1/frame"
    print(scan_xss(url, '<юзер агент>'))

Вот и все. Осталось это скомбинировать в одну программу на Python, считайте что это ваше домашнее задание. Попробуйте улучшить код, улучшить вывод сообщений и многое другое. А как насчет асинхронности? Обо всем этом мы будем говорить во второй части статьи — там мы будем красоту наводить, и пытаться оптимизировать.


Заключение


Итак, в этой статье мы смогли написать набор OSINT (да и не только OSINT) инструментов. Оставляйте ваши комментарии, критику. Я понимаю, что всем не угодить, и вы можете высказаться в комментариях.


Я хочу предупредить, что весь мой код был скоплен за несколько месяцев кодинга. Если вы обнаружили код, который был уже в другой статье, прошу оставить комментарий, я внесу источник в статью. Благодарю за понимание!


Больше Python кода, гайдов и полезных инструментов и гайдов вы можете найти в моем канале, заходите в гости.


Спасибо за внимание! Надеюсь вам понравилась статья!


Ссылки


Комментарии (14)


  1. DrArgentum
    25.05.2024 13:02

    А мне нравится, код вполне приятный, да и местами полезный.


    1. andrezh
      25.05.2024 13:02
      +6

      Так это же ваша статья и ваш код, судя по гитхабу AlexeevDeveloper.

      Код, кстати, плохой.


      1. DrArgentum
        25.05.2024 13:02

        А расскажите пожалуйста почему код плохой? Я использовал анализатор ruff и пытаюсь соблюдать pep 8

        А если вы про то, что сканеры xss уязвимостей и SQL инъекций слишком простые и банальные, то я с вами буду согласен


        1. andrezh
          25.05.2024 13:02

          А расскажите пожалуйста почему код плохой? Я использовал анализатор ruff и пытаюсь соблюдать pep 8

          Структура, стиль. Все плохо. Линтер навыков не добавляет. Советую использовать Copilot.

          p.s. А разве на Хабре разрешено клоноводство для рекламы телеграм каналов?


          1. DrArgentum
            25.05.2024 13:02

            Я написал статью, но я не владелец телеграм канала


    1. Grigory_Otrepyev
      25.05.2024 13:02
      +5

      @2moderatorа у вас тут как обычно - мультиаккаунтом балуются.


      1. DrArgentum
        25.05.2024 13:02

        Я напомню, что аккаунт с которой была написана статья - не мой, а заказчика. У меня один аккаунт на Хабре - и это drargentum


  1. vasyakolobok77
    25.05.2024 13:02
    +3

    Ощущение, что код и статью делал школьник с помощью chat-gpt:

    -- бессмысленный модуль для генерации юзерагента, который можно сгенерировать по шаблону кодом в одну строчку;

    -- бессмысленные замены \n на \n;

    -- топорная регулярка для MAC, навскидку более правильная и простая регулярка: [0-9a-f]{2}(:[0-9a-f]{2}){5}
    -- для генерации мака тоже топорный алгоритм вместо того чтобы сгенерить 6 рандом байт и потом сделать toHex;

    -- топорное и неговорящее именование переменных / функций;

    -- одновременное использование raise и return, а местами return error вместо raise;

    -- сделана проверка на 127.0.0.1 но забыто, что вся подсеть 127/8 является исключением, и кроме нее есть еще десяток подсетей, которые имеют особое назначение;

    вобщем идея может быть и интересная, но реализация на уровне "как не надо писать код"


    1. DrArgentum
      25.05.2024 13:02

      ChatGPT я никогда не использую для написания статей, а также для генерации кода. Максимум что я делал - это генерация регулярных выражений, ибо я в них ноль

      return error я использовал потому что мне не требовалось исключение.

      Спасибо за заметки, негативный опыт - тоже опыт


  1. Grigory_Otrepyev
    25.05.2024 13:02
    +3

    Для этого нам не нужны будут дополнительные библиотеки. Но вам возможно нужно будет установить набор инструментов net-tools. Этот набор содержит в себе утилиту ifconfig, аналог ipconfig в Windows, который нам нужен будет для управления сетями.

    OSINT framework focused on gathering information from free tools or resources.

    1 Никакого отношения OSINT к замене MAC не имеет
    2 Связка MAC-PORT приедет в ELK > SOC и последует звонок от СБ.

    {info_data.get("hostname")}

    не получите, что за школотронство.

    print(f'Name of service work on port 80: {serv_port.type_port(80)}')

    и что ?

    Займемся детектором ARP спуфинга. ARP-spoofing — разновидность сетевой атаки типа MITM, применяемая в сетях с использованием протокола ARP. В основном применяется в сетях Ethernet.

    И где вы в 2024 другие видели ?

    вы можете найти в моем канале,

    а, понятно.


  1. alfa41
    25.05.2024 13:02
    +2

    Автор забыл упомянуть, что у Virustotal огромнейшая база исторических DNS записей, доступная по тому же API. Это "маст-хэв" для OSINT. Dnskron, к примеру, одна из реализаций такого скрипта, способного последовательно вытащить сотни связанных доменов через связь типа "domain-ip"


  1. olegtsss
    25.05.2024 13:02

    В комментариях под статьей налито много негатива. Не сочтите, за еще одну ложку дегтя, но я бы хотел обратить внимание на саму идею создания подобного рода комбайнов. С одной стороны, это удобно, все под рукой. С другой стороны, куча зависимостей от сторонних ресурсов, которые со временем будут изменяться. Создание подобного рода инструментов вынуждает всегда их поддерживать в актуальном состоянии и желательно следить за изменениями на стороне внешних ресурсов. Да и учесть у себя все имеющиеся возможности не выйдет. Итого имеем сборник "простейших техник". Следовательно встает вопрос, а стоит ли создавать такого рода комбайны. Мой ответ, к которому я пришел со временем - не стоит.


    1. alfa41
      25.05.2024 13:02

      Придерживаюсь того же мнения. Один скрипт под одну задачу. Но в то же время есть смысл обьединять по типу обьекта, к примеру, IP или домен, и собирать данные с разных API в рамках одного скрипта. Как показывает практика, разные источники могут дополнять друга.


  1. elixirkmc
    25.05.2024 13:02

    Не очень понятно, что дает подобный просмотр ARP пакетов, если подключиться в сеть, в которой уже активен спуфер. В зависимости от порядка получения пакетов, можно перепутать корректный ответ с подменным.
    P.S. По коду: в обработчике пакета не указан никакой аргумент.