Зачем вообще «самописный» сканер

Nmap и masscan великолепны. Но в реальной жизни часто хочется мини-инструмент, который можно:

  • быстро встроить в свои пайплайны (CI, ночные проверки, health-чек задач),

  • тонко настроить под конкретную сеть/ограничения,

  • расширить под свои кейсы (например, сразу отправлять результат в Telegram/Prometheus/ELK).

В статье — практический разбор двух подходов: многопоточность на ThreadPoolExecutor и асинхронщина на asyncio. Плюс: баннер-граббинг, HTTP-проверка, простая TLS-детекция (версия протокола, ALPN), CLI, JSON/CSV-вывод и базовые приёмы «бережного» сканирования.

⚠️ Сканируйте только свои сети или с письменного разрешения владельца. Нагрузка на чужие хосты без согласования — плохая идея и, возможно, нарушение закона/политик.


Архитектура и выбор подхода

TCP connect-scan — самый «джентльменский» метод: мы делаем обычный connect(). В отличие от SYN-скана (нужны сырые сокеты/права root и лучше выбирать Scapy/nmap), connect-скан на чистом Python прозрачен и переносим.

  • Threads (ThreadPoolExecutor) — просто, предсказуемо, хорошо для LAN/небольших диапазонов.

  • asyncio — меньше накладных расходов при массовых коннектах, тонкий контроль параллелизма, удобные таймауты и бэкпрешер.

Я покажу оба. Для «продакшен-миникомбайна» внизу — полный asyncio-скрипт с CLI, форматами вывода и TLS-детекцией.


Быстрый вариант №1: ThreadPool + баннер-граббинг

Кейс: маленькая подсеть, хочется максимально «в лоб», без зависимостей.

import socket
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed

NETWORKS = ["10.16.14.5/32", "10.16.25.32/27"]
PORTS = range(1, 1025)     # пример: только well-known
CONNECT_TIMEOUT = 0.5
READ_TIMEOUT = 0.5
THREADS = 800

SERVICE_BANNERS = [
    (b"SSH-", "ssh"),
    (b"FTP", "ftp"),
    (b"SMTP", "smtp"),
    (b"POP3", "pop3"),
    (b"IMAP", "imap"),
    (b"HTTP/", "http"),
    (b"HTTP", "http"),
    (b"Redis", "redis"),
    (b"MySQL", "mysql"),
    (b"PostgreSQL", "postgresql"),
    (b"RTSP", "rtsp"),
    (b"Telnet", "telnet"),
    (b"LDAP", "ldap"),
    (b"RFB", "vnc"),
    (b"AMQP", "amqp"),
    (b"SMB", "smb"),
]

def expand_targets(networks):
    targets = []
    for item in networks:
        net = ipaddress.ip_network(item, strict=False)
        targets.extend(str(h) for h in net.hosts())
    return targets

def detect_banner(sock: socket.socket) -> bytes:
    try:
        sock.settimeout(READ_TIMEOUT)
        return sock.recv(512)
    except (socket.timeout, OSError):
        return b""

def detect_protocol(sock: socket.socket) -> str:
    b = detect_banner(sock)
    for sig, name in SERVICE_BANNERS:
        if sig in b:
            return name
    # HTTP HEAD как «активная» проба
    try:
        sock.sendall(b"HEAD / HTTP/1.0\r\n\r\n")
        sock.settimeout(READ_TIMEOUT)
        data = sock.recv(128)
        if data.startswith(b"HTTP/"):
            return "http"
    except (socket.timeout, OSError):
        pass
    return "unknown"

def scan_port(ip: str, port: int):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.settimeout(CONNECT_TIMEOUT)
        if s.connect_ex((ip, port)) != 0:
            return None
        try:
            service = socket.getservbyport(port, "tcp")
        except OSError:
            service = "unknown"
        if service == "unknown":
            service = detect_protocol(s)
        return {"ip": ip, "port": port, "service": service}

def main():
    ips = expand_targets(NETWORKS)
    results = []
    with ThreadPoolExecutor(max_workers=THREADS) as pool:
        futures = (pool.submit(scan_port, ip, p) for ip in ips for p in PORTS)
        for f in as_completed(futures):
            r = f.result()
            if r:
                results.append(r)
                print(f'{r["ip"]}:{r["port"]} -> {r["service"]}')

if __name__ == "__main__":
    main()

Плюсы: мини-код, легко читать и отлаживать.
Минусы: при тысячах одновременных коннектов треды прожорливы, выше накладные.


Производственный вариант №2: asyncio + семафоры + TLS-детекция

Что добавим по сравнению с «быстрым» вариантом:

  • CLI (argparse): сети, порты (диапазоны 80,443,8000-8100), таймауты, параллелизм.

  • Параллелизм через семафор: честный контроль нагрузки.

  • Баннер-граббинг и HTTP-HEAD.

  • TLS-детекция: пытаемся выполнить TLS-рукопожатие, собираем версию протокола и ALPN.

  • Форматы выводаplain (текст), jsoncsv.

  • Мягкие таймауты и бэкпрешер, чтобы не «класть» сеть.

import argparse
import asyncio
import ipaddress
import json
import re
import ssl
import sys
from typing import Iterable, List, Tuple, Dict, Any

SERVICE_BANNERS: List[Tuple[bytes, str]] = [
    (b"SSH-", "ssh"), (b"FTP", "ftp"), (b"SMTP", "smtp"),
    (b"POP3", "pop3"), (b"IMAP", "imap"), (b"HTTP/", "http"),
    (b"HTTP", "http"), (b"Redis", "redis"), (b"MySQL", "mysql"),
    (b"PostgreSQL", "postgresql"), (b"RTSP", "rtsp"), (b"Telnet", "telnet"),
    (b"LDAP", "ldap"), (b"RFB", "vnc"), (b"AMQP", "amqp"), (b"SMB", "smb"),
]

TLS_PORTS_DEFAULT = {443, 8443, 9443, 7443, 10443}

def parse_ports(spec: str) -> List[int]:
    """
    Пример: "22,80,443,8000-8100"
    """
    ports = set()
    for part in spec.split(","):
        part = part.strip()
        if not part:
            continue
        if "-" in part:
            a, b = part.split("-", 1)
            ports.update(range(int(a), int(b) + 1))
        else:
            ports.add(int(part))
    return sorted(p for p in ports if 1 <= p <= 65535)

def expand_targets(net_specs: Iterable[str]) -> List[str]:
    ips: List[str] = []
    for it in net_specs:
        it = it.strip()
        if not it:
            continue
        # поддерживаем и одиночные ip
        if re.match(r"^\d+\.\d+\.\d+\.\d+$", it):
            ips.append(it)
            continue
        net = ipaddress.ip_network(it, strict=False)
        ips.extend(str(h) for h in net.hosts())
    return ips

async def try_read_banner(reader: asyncio.StreamReader, n: int, timeout: float) -> bytes:
    try:
        return await asyncio.wait_for(reader.read(n), timeout=timeout)
    except asyncio.TimeoutError:
        return b""
    except Exception:
        return b""

def detect_from_banner(b: bytes) -> str:
    for sig, name in SERVICE_BANNERS:
        if sig in b:
            return name
    return "unknown"

async def try_http_probe(reader, writer, timeout: float) -> bool:
    try:
        writer.write(b"HEAD / HTTP/1.0\r\nHost: localhost\r\n\r\n")
        await writer.drain()
        data = await asyncio.wait_for(reader.read(128), timeout=timeout)
        return data.startswith(b"HTTP/")
    except Exception:
        return False

async def try_tls(host: str, port: int, timeout: float) -> Dict[str, Any] | None:
    """
    Пытаемся установить TLS (SNI=host). Если успешно — вернём версию TLS и ALPN.
    """
    ctx = ssl.create_default_context()
    try:
        reader, writer = await asyncio.wait_for(
            asyncio.open_connection(host=host, port=port, ssl=ctx, server_hostname=host),
            timeout=timeout
        )
        sslobj = writer.get_extra_info("ssl_object")
        info = {
            "tls": getattr(sslobj, "version", lambda: None)(),
            "alpn": getattr(sslobj, "selected_alpn_protocol", lambda: None)(),
        }
        writer.close()
        try:
            await writer.wait_closed()
        except Exception:
            pass
        return info
    except Exception:
        return None

async def scan_one(host: str, port: int, conn_timeout: float, read_timeout: float,
                   tls_guess: bool, sem: asyncio.Semaphore) -> Dict[str, Any] | None:
    async with sem:
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(host=host, port=port),
                timeout=conn_timeout
            )
        except Exception:
            return None

        result: Dict[str, Any] = {"ip": host, "port": port, "service": "unknown"}

        # 1) Пассивный баннер
        b = await try_read_banner(reader, 512, read_timeout)
        srv = detect_from_banner(b)
        if srv != "unknown":
            result["service"] = srv
        else:
            # 2) HTTP-проба
            ok = await try_http_probe(reader, writer, read_timeout)
            if ok:
                result["service"] = "http"

        writer.close()
        try:
            await writer.wait_closed()
        except Exception:
            pass

        # 3) TLS-детекция (по порту или по факту неизвестного сервиса)
        if tls_guess and (port in TLS_PORTS_DEFAULT or result["service"] == "unknown"):
            tls_info = await try_tls(host, port, read_timeout)
            if tls_info:
                result["service"] = "https"
                result.update(tls_info)

        # 4) getservbyport как подсказка (последним слоем)
        if result["service"] == "unknown":
            try:
                import socket
                result["service"] = socket.getservbyport(port, "tcp")
            except Exception:
                pass

        return result

async def run(ips: List[str], ports: List[int], conn_timeout: float, read_timeout: float,
              concurrency: int, tls_guess: bool, fmt: str) -> None:
    sem = asyncio.Semaphore(concurrency)
    tasks = [
        scan_one(ip, p, conn_timeout, read_timeout, tls_guess, sem)
        for ip in ips for p in ports
    ]

    if fmt == "json":
        out = []
        for coro in asyncio.as_completed(tasks):
            r = await coro
            if r:
                out.append(r)
        print(json.dumps(out, ensure_ascii=False, indent=2))
        return

    if fmt == "csv":
        print("ip,port,service,tls,alpn")
        for coro in asyncio.as_completed(tasks):
            r = await coro
            if r:
                print(f'{r["ip"]},{r["port"]},{r.get("service","")},{r.get("tls","")},{r.get("alpn","")}')
        return

    # plain
    for coro in asyncio.as_completed(tasks):
        r = await coro
        if r:
            extra = []
            if "tls" in r and r["tls"]:
                extra.append(f'TLS={r["tls"]}')
            if "alpn" in r and r["alpn"]:
                extra.append(f'ALPN={r["alpn"]}')
            suffix = f' ({", ".join(extra)})' if extra else ""
            print(f'{r["ip"]}:{r["port"]} -> {r["service"]}{suffix}')

def build_cli():
    ap = argparse.ArgumentParser(description="Async TCP port scanner with banner & TLS detection")
    ap.add_argument("-n", "--net", action="append", required=True,
                    help="Сеть/адрес: пример 10.0.0.0/24 или 10.0.0.5. Можно несколько флагов.")
    ap.add_argument("-p", "--ports", default="1-1024,3389,5432,6379,8080-8090,8443",
                    help="Порты: 22,80,443,8000-8100 (по умолчанию: популярные)")
    ap.add_argument("--conn-timeout", type=float, default=0.5, help="Таймаут установления TCP (сек)")
    ap.add_argument("--read-timeout", type=float, default=0.5, help="Таймаут чтения баннера/HTTP/TLS (сек)")
    ap.add_argument("-c", "--concurrency", type=int, default=1000, help="Параллелизм (кол-во одновременных коннектов)")
    ap.add_argument("--no-tls-guess", action="store_true", help="Отключить попытки TLS-детекции")
    ap.add_argument("-f", "--format", choices=["plain","json","csv"], default="plain", help="Формат вывода")
    return ap

def main():
    ap = build_cli()
    args = ap.parse_args()

    ips = expand_targets(args.net)
    ports = parse_ports(args.ports)

    try:
        asyncio.run(run(
            ips=ips, ports=ports,
            conn_timeout=args.conn_timeout,
            read_timeout=args.read_timeout,
            concurrency=args.concurrency,
            tls_guess=not args.no_tls_guess,
            fmt=args.format
        ))
    except KeyboardInterrupt:
        print("\nInterrupted.", file=sys.stderr)

if __name__ == "__main__":
    main()

Что получает читатель «поверх кода»

  • Управление нагрузкой--concurrency задаёт одновременные соединения, семафор гарантирует верхнюю границу.

  • Баннер-граббинг: читаем до 512 байт — хватает, чтобы поймать SSH/SMTP/Redis/… .

  • HTTP-проба: если сервер молчит — мягко «тычем» HEAD / и распознаём HTTP/….

  • TLS-детекция: пытаемся установить TLS (SNI=хост/IP). Если ок — получаем TLSv1.2/1.3 и ALPN (http/1.1h2).

  • Форматыplain для консоли, json и csv — для пайплайнов/импорта.


Как сканировать «бережно»

  • Подбирайте --concurrency под RTT и мощность хостов: для LAN 500–2000, для WAN/внешки 100–400.

  • Увеличивайте таймауты на «дальних» сегментах: --conn-timeout 1.0 --read-timeout 1.0.

  • Сканы по ночам/окнах обслуживания — меньше ложных тревог и нагрузка на SLA ниже.

  • Логируйте свои действия (внутренний change-record) — это помогает безопасникам и вам самим.


Тюнинг системы (коротко)

  • Linux: увеличьте лимиты дескрипторов (ulimit -n 65535), fs.file-max.

  • Windows: следите за TIME_WAIT и ephemeral port range; уменьшайте параллелизм, если видите «исчерпание портов».

  • Firewall: учитывайте rate-limits, IDS/IPS — могут резать агрессивные сканы.


Тестирование на столе

Поднимите локальные цели, чтобы увидеть, как детектятся сервисы:

# HTTP
python -m http.server 8080
# Redis (если установлен)
redis-server --port 6379
# SSH (на Linux обычно уже запущен на 22)

Проверьте:

python scanner.py -n 127.0.0.1/32 -p 22,6379,8080 --format plain

Ожидаемо увидите что-то вроде:

127.0.0.1:22 -> ssh
127.0.0.1:6379 -> redis
127.0.0.1:8080 -> http

Что ещё можно добавить (и почему я не впихивал в основу)

  • SYN-скан через Scapy/raw-сокеты — быстрее, но потребует прав и другой логики обработки.

  • TLS-сертификат (CN/SAN) — можно достать из ssl_object.getpeercert() и включить в вывод.

  • Фингерпринтинг: расширить сигнатуры, добавить SMB/NTLM-negotiate, RDP Cookie, MQTT CONNECT и т.д.

  • Параллельный UDP — отдельная история (и боль), но asyncio отлично масштабируется и туда.


Выводы

Самописный сканер — это не «замена nmap», а настройка под себя: быстрые проверки, интеграция в пайплайны, тонкий контроль нагрузки.
Два подхода (threads/asyncio) закрывают 95% «живых» кейсов в LAN и внутри периметра.

Готов обсудить расширения: TLS-метаданные, RDP/SMB-фингерпринтинг, экспорт в Prometheus/Elastic.


Приложение: команды для запуска

# Простой прогон по /27 и популярным портам, plain-вывод
python scanner.py -n 10.16.25.32/27

# JSON-вывод — удобно для пайплайна
python scanner.py -n 10.16.25.32/27 -f json > result.json

# «Мягкий» скан внешних хостов
python scanner.py -n 203.0.113.10/32 -p 22,80,443,8080-8090 \
  --conn-timeout 1.0 --read-timeout 1.0 -c 300

Комментарии (0)