Зачем вообще «самописный» сканер
Nmap и masscan великолепны. Но в реальной жизни часто хочется мини-инструмент, который можно:
быстро встроить в свои пайплайны (CI, ночные проверки, health-чек задач),
тонко настроить под конкретную сеть/ограничения,
расширить под свои кейсы (например, сразу отправлять результат в Telegram/Prometheus/ELK).
В статье — практический разбор двух подходов: многопоточность на ThreadPoolExecutor
и асинхронщина на asyncio
. Плюс: баннер-граббинг, HTTP-проверка, простая TLS-детекция (версия протокола, ALPN), CLI, JSON/CSV-вывод и базовые приёмы «бережного» сканирования.
⚠️ Сканируйте только свои сети или с письменного разрешения владельца. Нагрузка на чужие хосты без согласования — плохая идея и, возможно, нарушение закона/политик.
Архитектура и выбор подхода
TCP connect-scan — самый «джентльменский» метод: мы делаем обычный connect()
. В отличие от SYN-скана (нужны сырые сокеты/права root и лучше выбирать Scapy/nmap), connect-скан на чистом Python прозрачен и переносим.
Threads (ThreadPoolExecutor) — просто, предсказуемо, хорошо для LAN/небольших диапазонов.
asyncio — меньше накладных расходов при массовых коннектах, тонкий контроль параллелизма, удобные таймауты и бэкпрешер.
Я покажу оба. Для «продакшен-миникомбайна» внизу — полный asyncio
-скрипт с CLI, форматами вывода и TLS-детекцией.
Быстрый вариант №1: ThreadPool + баннер-граббинг
Кейс: маленькая подсеть, хочется максимально «в лоб», без зависимостей.
import socket
import ipaddress
from concurrent.futures import ThreadPoolExecutor, as_completed
NETWORKS = ["10.16.14.5/32", "10.16.25.32/27"]
PORTS = range(1, 1025) # пример: только well-known
CONNECT_TIMEOUT = 0.5
READ_TIMEOUT = 0.5
THREADS = 800
SERVICE_BANNERS = [
(b"SSH-", "ssh"),
(b"FTP", "ftp"),
(b"SMTP", "smtp"),
(b"POP3", "pop3"),
(b"IMAP", "imap"),
(b"HTTP/", "http"),
(b"HTTP", "http"),
(b"Redis", "redis"),
(b"MySQL", "mysql"),
(b"PostgreSQL", "postgresql"),
(b"RTSP", "rtsp"),
(b"Telnet", "telnet"),
(b"LDAP", "ldap"),
(b"RFB", "vnc"),
(b"AMQP", "amqp"),
(b"SMB", "smb"),
]
def expand_targets(networks):
targets = []
for item in networks:
net = ipaddress.ip_network(item, strict=False)
targets.extend(str(h) for h in net.hosts())
return targets
def detect_banner(sock: socket.socket) -> bytes:
try:
sock.settimeout(READ_TIMEOUT)
return sock.recv(512)
except (socket.timeout, OSError):
return b""
def detect_protocol(sock: socket.socket) -> str:
b = detect_banner(sock)
for sig, name in SERVICE_BANNERS:
if sig in b:
return name
# HTTP HEAD как «активная» проба
try:
sock.sendall(b"HEAD / HTTP/1.0\r\n\r\n")
sock.settimeout(READ_TIMEOUT)
data = sock.recv(128)
if data.startswith(b"HTTP/"):
return "http"
except (socket.timeout, OSError):
pass
return "unknown"
def scan_port(ip: str, port: int):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(CONNECT_TIMEOUT)
if s.connect_ex((ip, port)) != 0:
return None
try:
service = socket.getservbyport(port, "tcp")
except OSError:
service = "unknown"
if service == "unknown":
service = detect_protocol(s)
return {"ip": ip, "port": port, "service": service}
def main():
ips = expand_targets(NETWORKS)
results = []
with ThreadPoolExecutor(max_workers=THREADS) as pool:
futures = (pool.submit(scan_port, ip, p) for ip in ips for p in PORTS)
for f in as_completed(futures):
r = f.result()
if r:
results.append(r)
print(f'{r["ip"]}:{r["port"]} -> {r["service"]}')
if __name__ == "__main__":
main()
Плюсы: мини-код, легко читать и отлаживать.
Минусы: при тысячах одновременных коннектов треды прожорливы, выше накладные.
Производственный вариант №2: asyncio + семафоры + TLS-детекция
Что добавим по сравнению с «быстрым» вариантом:
CLI (
argparse
): сети, порты (диапазоны80,443,8000-8100
), таймауты, параллелизм.Параллелизм через семафор: честный контроль нагрузки.
Баннер-граббинг и HTTP-HEAD.
TLS-детекция: пытаемся выполнить TLS-рукопожатие, собираем версию протокола и ALPN.
Форматы вывода:
plain
(текст),json
,csv
.Мягкие таймауты и бэкпрешер, чтобы не «класть» сеть.
import argparse
import asyncio
import ipaddress
import json
import re
import ssl
import sys
from typing import Iterable, List, Tuple, Dict, Any
SERVICE_BANNERS: List[Tuple[bytes, str]] = [
(b"SSH-", "ssh"), (b"FTP", "ftp"), (b"SMTP", "smtp"),
(b"POP3", "pop3"), (b"IMAP", "imap"), (b"HTTP/", "http"),
(b"HTTP", "http"), (b"Redis", "redis"), (b"MySQL", "mysql"),
(b"PostgreSQL", "postgresql"), (b"RTSP", "rtsp"), (b"Telnet", "telnet"),
(b"LDAP", "ldap"), (b"RFB", "vnc"), (b"AMQP", "amqp"), (b"SMB", "smb"),
]
TLS_PORTS_DEFAULT = {443, 8443, 9443, 7443, 10443}
def parse_ports(spec: str) -> List[int]:
"""
Пример: "22,80,443,8000-8100"
"""
ports = set()
for part in spec.split(","):
part = part.strip()
if not part:
continue
if "-" in part:
a, b = part.split("-", 1)
ports.update(range(int(a), int(b) + 1))
else:
ports.add(int(part))
return sorted(p for p in ports if 1 <= p <= 65535)
def expand_targets(net_specs: Iterable[str]) -> List[str]:
ips: List[str] = []
for it in net_specs:
it = it.strip()
if not it:
continue
# поддерживаем и одиночные ip
if re.match(r"^\d+\.\d+\.\d+\.\d+$", it):
ips.append(it)
continue
net = ipaddress.ip_network(it, strict=False)
ips.extend(str(h) for h in net.hosts())
return ips
async def try_read_banner(reader: asyncio.StreamReader, n: int, timeout: float) -> bytes:
try:
return await asyncio.wait_for(reader.read(n), timeout=timeout)
except asyncio.TimeoutError:
return b""
except Exception:
return b""
def detect_from_banner(b: bytes) -> str:
for sig, name in SERVICE_BANNERS:
if sig in b:
return name
return "unknown"
async def try_http_probe(reader, writer, timeout: float) -> bool:
try:
writer.write(b"HEAD / HTTP/1.0\r\nHost: localhost\r\n\r\n")
await writer.drain()
data = await asyncio.wait_for(reader.read(128), timeout=timeout)
return data.startswith(b"HTTP/")
except Exception:
return False
async def try_tls(host: str, port: int, timeout: float) -> Dict[str, Any] | None:
"""
Пытаемся установить TLS (SNI=host). Если успешно — вернём версию TLS и ALPN.
"""
ctx = ssl.create_default_context()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host=host, port=port, ssl=ctx, server_hostname=host),
timeout=timeout
)
sslobj = writer.get_extra_info("ssl_object")
info = {
"tls": getattr(sslobj, "version", lambda: None)(),
"alpn": getattr(sslobj, "selected_alpn_protocol", lambda: None)(),
}
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
return info
except Exception:
return None
async def scan_one(host: str, port: int, conn_timeout: float, read_timeout: float,
tls_guess: bool, sem: asyncio.Semaphore) -> Dict[str, Any] | None:
async with sem:
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(host=host, port=port),
timeout=conn_timeout
)
except Exception:
return None
result: Dict[str, Any] = {"ip": host, "port": port, "service": "unknown"}
# 1) Пассивный баннер
b = await try_read_banner(reader, 512, read_timeout)
srv = detect_from_banner(b)
if srv != "unknown":
result["service"] = srv
else:
# 2) HTTP-проба
ok = await try_http_probe(reader, writer, read_timeout)
if ok:
result["service"] = "http"
writer.close()
try:
await writer.wait_closed()
except Exception:
pass
# 3) TLS-детекция (по порту или по факту неизвестного сервиса)
if tls_guess and (port in TLS_PORTS_DEFAULT or result["service"] == "unknown"):
tls_info = await try_tls(host, port, read_timeout)
if tls_info:
result["service"] = "https"
result.update(tls_info)
# 4) getservbyport как подсказка (последним слоем)
if result["service"] == "unknown":
try:
import socket
result["service"] = socket.getservbyport(port, "tcp")
except Exception:
pass
return result
async def run(ips: List[str], ports: List[int], conn_timeout: float, read_timeout: float,
concurrency: int, tls_guess: bool, fmt: str) -> None:
sem = asyncio.Semaphore(concurrency)
tasks = [
scan_one(ip, p, conn_timeout, read_timeout, tls_guess, sem)
for ip in ips for p in ports
]
if fmt == "json":
out = []
for coro in asyncio.as_completed(tasks):
r = await coro
if r:
out.append(r)
print(json.dumps(out, ensure_ascii=False, indent=2))
return
if fmt == "csv":
print("ip,port,service,tls,alpn")
for coro in asyncio.as_completed(tasks):
r = await coro
if r:
print(f'{r["ip"]},{r["port"]},{r.get("service","")},{r.get("tls","")},{r.get("alpn","")}')
return
# plain
for coro in asyncio.as_completed(tasks):
r = await coro
if r:
extra = []
if "tls" in r and r["tls"]:
extra.append(f'TLS={r["tls"]}')
if "alpn" in r and r["alpn"]:
extra.append(f'ALPN={r["alpn"]}')
suffix = f' ({", ".join(extra)})' if extra else ""
print(f'{r["ip"]}:{r["port"]} -> {r["service"]}{suffix}')
def build_cli():
ap = argparse.ArgumentParser(description="Async TCP port scanner with banner & TLS detection")
ap.add_argument("-n", "--net", action="append", required=True,
help="Сеть/адрес: пример 10.0.0.0/24 или 10.0.0.5. Можно несколько флагов.")
ap.add_argument("-p", "--ports", default="1-1024,3389,5432,6379,8080-8090,8443",
help="Порты: 22,80,443,8000-8100 (по умолчанию: популярные)")
ap.add_argument("--conn-timeout", type=float, default=0.5, help="Таймаут установления TCP (сек)")
ap.add_argument("--read-timeout", type=float, default=0.5, help="Таймаут чтения баннера/HTTP/TLS (сек)")
ap.add_argument("-c", "--concurrency", type=int, default=1000, help="Параллелизм (кол-во одновременных коннектов)")
ap.add_argument("--no-tls-guess", action="store_true", help="Отключить попытки TLS-детекции")
ap.add_argument("-f", "--format", choices=["plain","json","csv"], default="plain", help="Формат вывода")
return ap
def main():
ap = build_cli()
args = ap.parse_args()
ips = expand_targets(args.net)
ports = parse_ports(args.ports)
try:
asyncio.run(run(
ips=ips, ports=ports,
conn_timeout=args.conn_timeout,
read_timeout=args.read_timeout,
concurrency=args.concurrency,
tls_guess=not args.no_tls_guess,
fmt=args.format
))
except KeyboardInterrupt:
print("\nInterrupted.", file=sys.stderr)
if __name__ == "__main__":
main()
Что получает читатель «поверх кода»
Управление нагрузкой:
--concurrency
задаёт одновременные соединения, семафор гарантирует верхнюю границу.Баннер-граббинг: читаем до 512 байт — хватает, чтобы поймать SSH/SMTP/Redis/… .
HTTP-проба: если сервер молчит — мягко «тычем»
HEAD /
и распознаёмHTTP/…
.TLS-детекция: пытаемся установить TLS (SNI=хост/IP). Если ок — получаем
TLSv1.2/1.3
и ALPN (http/1.1
,h2
).Форматы:
plain
для консоли,json
иcsv
— для пайплайнов/импорта.
Как сканировать «бережно»
Подбирайте
--concurrency
под RTT и мощность хостов: для LAN 500–2000, для WAN/внешки 100–400.Увеличивайте таймауты на «дальних» сегментах:
--conn-timeout 1.0 --read-timeout 1.0
.Сканы по ночам/окнах обслуживания — меньше ложных тревог и нагрузка на SLA ниже.
Логируйте свои действия (внутренний change-record) — это помогает безопасникам и вам самим.
Тюнинг системы (коротко)
Linux: увеличьте лимиты дескрипторов (
ulimit -n 65535
),fs.file-max
.Windows: следите за
TIME_WAIT
и ephemeral port range; уменьшайте параллелизм, если видите «исчерпание портов».Firewall: учитывайте rate-limits, IDS/IPS — могут резать агрессивные сканы.
Тестирование на столе
Поднимите локальные цели, чтобы увидеть, как детектятся сервисы:
# HTTP
python -m http.server 8080
# Redis (если установлен)
redis-server --port 6379
# SSH (на Linux обычно уже запущен на 22)
Проверьте:
python scanner.py -n 127.0.0.1/32 -p 22,6379,8080 --format plain
Ожидаемо увидите что-то вроде:
127.0.0.1:22 -> ssh
127.0.0.1:6379 -> redis
127.0.0.1:8080 -> http
Что ещё можно добавить (и почему я не впихивал в основу)
SYN-скан через Scapy/raw-сокеты — быстрее, но потребует прав и другой логики обработки.
TLS-сертификат (CN/SAN) — можно достать из
ssl_object.getpeercert()
и включить в вывод.Фингерпринтинг: расширить сигнатуры, добавить SMB/NTLM-negotiate, RDP Cookie, MQTT CONNECT и т.д.
Параллельный UDP — отдельная история (и боль), но
asyncio
отлично масштабируется и туда.
Выводы
Самописный сканер — это не «замена nmap», а настройка под себя: быстрые проверки, интеграция в пайплайны, тонкий контроль нагрузки.
Два подхода (threads/asyncio) закрывают 95% «живых» кейсов в LAN и внутри периметра.
Готов обсудить расширения: TLS-метаданные, RDP/SMB-фингерпринтинг, экспорт в Prometheus/Elastic.
Приложение: команды для запуска
# Простой прогон по /27 и популярным портам, plain-вывод
python scanner.py -n 10.16.25.32/27
# JSON-вывод — удобно для пайплайна
python scanner.py -n 10.16.25.32/27 -f json > result.json
# «Мягкий» скан внешних хостов
python scanner.py -n 203.0.113.10/32 -p 22,80,443,8080-8090 \
--conn-timeout 1.0 --read-timeout 1.0 -c 300