Продолжаю развивать свой домашний сервачок, для удобного добавления сервисов понадобились поддомены . Так как за статический IP своему провайдеру я платить не хочу, то я использовал DDNS от TP-Link. И адрес выглядел https://my-adress.tplinkdns.com. TP-Link даёт только один поддомен и поддомены 2-го уровня создать нет возможности.

Поэтому думал использовать ddclient и Cloudfare. Выяснилось что Cloudfare больше не работает с ru зоной. Вот расисты!

Поискал хостеров с API для управления DNS-записями. Какое моё было удивление, что сейчас регистраторы требуют столько много за домен. Ещё и вводят в заблуждения всяческими способами. Первый год берут немного, а потом тысячами. Например REG.ru за домен на ru берёт в первый год 169 рублей, типо по акции и со скидкой 25%. А стоимость продления всячески скрывается, только в дебрях документации можно найти, что они возьмут 1424 рубля за следующие годы.

Хостер, которым я пользуюсь с 2014, не вводил меня в заблуждение. И я раньше платил за ru зону 99 рублей, шло время и сейчас мне обходится 299 рублей. Правда я нахожусь на архивном тарифе, которого давно уже нет.

В общем выбор пал на Beget. Прозрачные условия и я уже пользовался их впсками. Купил у них доменное имя в ru зоне за 199 рублей, продление будет стоить 420. Зарегал поддомены для своих текущих сервисов. На домене прописал A-запись, указал свой внешний IP. В поддоменах прописал CNAME, ведущий на основной домен.

У меня на сервере будет запускаться скрипт по таймеру, проверять внешний IP. Если он изменился, то менять A-запись у домена, через API Beget. Особенность апишки, при обновлении DNS записи обновляется весь набор записей для FQDN. Поэтому скрипт сначала получает данные о записи, а потом только обновляет её.

В скрипте есть логгирование и ротация логов производится этим же скриптом.

Переменные окружения скрипта: Обязательные:

  • BEGET_LOGIN — логин Beget API

  • BEGET_PASSWORD — пароль Beget API

  • BEGET_FQDN — корневой домен, например my-adress.ru Опциональные:

  • BEGET_A_PRIORITY — приоритет A (по умолчанию 10)

  • STATE_FILE — файл, куда запоминать последний IP (по умолчанию .state/last_ip.txt рядом со скриптом)

  • IP_URL — URL, который возвращает внешний IP в тексте (по умолчанию https://api.ipify.org) Логи:

  • LOG_FILE — путь к файлу лога (по умолчанию logs/beget-ddns.log рядом со скриптом)

  • LOG_MAX_BYTES — размер файла лога до ротации (по умолчанию 1_000_000)

  • LOG_BACKUP_COUNT — сколько ротированных файлов хранить (по умолчанию 5)

  • LOG_LEVEL — уровень логирования (INFO, DEBUG, …; по умолчанию INFO)

#!/usr/bin/env python3
from __future__ import annotations

import json
import logging
import logging.handlers
import os
import re
import sys
import time
import urllib.parse
import urllib.request
from pathlib import Path
from typing import Any, Dict, List, Tuple


BEGET_API_BASE = "https://api.beget.com/api/dns"
LOG = logging.getLogger("beget-ddns")


def _setup_logging() -> None:
    script_dir = Path(__file__).resolve().parent
    log_file = Path(os.environ.get("LOG_FILE", str(script_dir / "logs" / "beget-ddns.log"))).expanduser()
    log_file.parent.mkdir(parents=True, exist_ok=True)

    max_bytes = int(os.environ.get("LOG_MAX_BYTES", str(1_000_000)))
    backup_count = int(os.environ.get("LOG_BACKUP_COUNT", str(5)))
    level_name = os.environ.get("LOG_LEVEL", "INFO").upper()
    level = getattr(logging, level_name, logging.INFO)

    LOG.setLevel(level)
    handler = logging.handlers.RotatingFileHandler(
        log_file,
        maxBytes=max_bytes,
        backupCount=backup_count,
        encoding="utf-8",
    )
    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
    handler.setFormatter(formatter)
    LOG.addHandler(handler)

    # Also emit to stdout (useful under systemd/journal).
    stream = logging.StreamHandler(sys.stdout)
    stream.setFormatter(formatter)
    LOG.addHandler(stream)


def _env_required(name: str) -> str:
    value = os.environ.get(name)
    if not value:
        raise RuntimeError(f"Missing required env var: {name}")
    return value


def _http_get_json(url: str, timeout_s: int = 20) -> Any:
    req = urllib.request.Request(url, headers={"User-Agent": "jhon-mosk-beget-ddns/1.0"})
    with urllib.request.urlopen(req, timeout=timeout_s) as resp:
        body = resp.read()
    return json.loads(body.decode("utf-8"))


def _http_get_text(url: str, timeout_s: int = 20) -> str:
    req = urllib.request.Request(url, headers={"User-Agent": "jhon-mosk-beget-ddns/1.0"})
    with urllib.request.urlopen(req, timeout=timeout_s) as resp:
        body = resp.read()
    return body.decode("utf-8").strip()


def _beget_call(method: str, login: str, password: str, input_data: Dict[str, Any]) -> Any:
    query = {
        "login": login,
        "passwd": password,
        "input_format": "json",
        "output_format": "json",
        "input_data": json.dumps(input_data, ensure_ascii=False, separators=(",", ":")),
    }
    url = f"{BEGET_API_BASE}/{method}?{urllib.parse.urlencode(query)}"
    payload = _http_get_json(url)
    dump_path = os.environ.get("DEBUG_DUMP_JSON")
    if dump_path:
        try:
            Path(dump_path).expanduser().write_text(
                json.dumps(payload, ensure_ascii=False, indent=2) + "\n",
                encoding="utf-8",
            )
        except Exception as e:
            LOG.warning("failed to write DEBUG_DUMP_JSON=%s: %s", dump_path, e)
    return payload


def _unwrap_beget_payload(payload: Any) -> Any:
    """
    Beget API responses are not consistently documented.

    We try to normalize the response by unwrapping common envelopes:
    - {"answer": {...}} (with status, errors, and/or result/data)
    - {"answer": {"status": "error", "errors": [...]}} -> raise a helpful error
    """
    if isinstance(payload, dict) and "answer" in payload:
        answer = payload["answer"]
        if isinstance(answer, dict):
            status = answer.get("status")
            if status == "error":
                errors = answer.get("errors") or []
                # best-effort extract
                err_texts: List[str] = []
                if isinstance(errors, list):
                    for e in errors:
                        if isinstance(e, dict):
                            t = e.get("error_text") or e.get("text") or e.get("message")
                            if isinstance(t, dict):
                                t = t.get("text") or t.get("type") or str(t)
                            if t:
                                err_texts.append(str(t))
                        elif isinstance(e, str):
                            err_texts.append(e)
                msg = "; ".join(err_texts) or repr(errors) or "unknown error"
                raise RuntimeError(f"Beget API error: {msg}")
            # Success case: sometimes data is nested
            for key in ("result", "data"):
                if key in answer:
                    return answer[key]
        return answer

    return payload


def _extract_records_from_getdata(payload: Any) -> Dict[str, List[Dict[str, Any]]]:
    """
    Beget getData docs show an "array" response with a dict-like content. In practice
    the API may return either:
    - a list with one object/dict inside, or
    - a dict directly.
    We normalize to the records dict.
    """
    payload = _unwrap_beget_payload(payload)

    if isinstance(payload, list) and payload:
        obj = payload[0]
    else:
        obj = payload

    if not isinstance(obj, dict):
        raise RuntimeError(f"Unexpected getData response type: {type(obj)}")

    # Some shapes may nest actual object under known keys.
    for key in ("result", "data"):
        if key in obj and isinstance(obj[key], list) and obj[key]:
            candidate = obj[key][0]
            if isinstance(candidate, dict):
                obj = candidate
                break

    records = obj.get("records")
    if not isinstance(records, dict):
        LOG.debug("getData raw object keys: %s", sorted(obj.keys()))
        raise RuntimeError("getData response has no 'records' dict")

    out: Dict[str, List[Dict[str, Any]]] = {}
    for key in ("A", "MX", "TXT"):
        items = records.get(key, [])
        if items is None:
            items = []
        if not isinstance(items, list):
            raise RuntimeError(f"getData records.{key} is not a list")
        out[key] = items
    return out


def _normalize_mx_txt_from_getdata(records: Dict[str, List[Dict[str, Any]]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    getData returns different shapes than changeRecords expects.
    - changeRecords wants: [{"priority": <int>, "value": <str>}]
    - getData may return: [{"priority": "10", "value": "mx1..."}] (as in docs)
      or other internal keys (observed by community integrations).

    We support both by mapping known variants.
    """

    def pick_priority(item: Dict[str, Any]) -> int:
        for k in ("priority", "preference"):
            if k in item:
                return int(item[k])
        # fall back
        return 10

    def pick_value(item: Dict[str, Any], candidates: Tuple[str, ...]) -> str:
        for k in candidates:
            if k in item and item[k] is not None:
                return str(item[k])
        return ""

    mx_out: List[Dict[str, Any]] = []
    for mx in records.get("MX", []):
        if not isinstance(mx, dict):
            continue
        value = pick_value(mx, ("value", "exchange", "host"))
        if value:
            mx_out.append({"priority": pick_priority(mx), "value": value})

    txt_out: List[Dict[str, Any]] = []
    for txt in records.get("TXT", []):
        if not isinstance(txt, dict):
            continue
        value = pick_value(txt, ("value", "txtdata", "text"))
        if value is None:
            value = ""
        txt_out.append({"priority": pick_priority(txt), "value": value})

    return mx_out, txt_out


def _parse_ip(s: str) -> str:
    s = s.strip()
    if re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", s):
        return s
    raise RuntimeError(f"IP_URL did not return an IPv4 address. Got: {s!r}")


def main() -> int:
    _setup_logging()

    login = _env_required("BEGET_LOGIN")
    password = _env_required("BEGET_PASSWORD")
    fqdn = _env_required("BEGET_FQDN")

    ip_url = os.environ.get("IP_URL", "https://api.ipify.org")
    a_priority = int(os.environ.get("BEGET_A_PRIORITY", "10"))

    script_dir = Path(__file__).resolve().parent
    default_state_file = script_dir / ".state" / "last_ip.txt"
    state_file = Path(os.environ.get("STATE_FILE", str(default_state_file))).expanduser()
    state_file.parent.mkdir(parents=True, exist_ok=True)

    now = int(time.time())
    current_ip = _parse_ip(_http_get_text(ip_url))

    last_ip = None
    if state_file.exists():
        last_ip = state_file.read_text(encoding="utf-8").strip() or None

    if last_ip == current_ip:
        LOG.info("ip unchanged: %s", current_ip)
        return 0

    # Read existing records to preserve MX/TXT.
    LOG.info("ip changed: %s -> %s; updating %s", last_ip, current_ip, fqdn)
    getdata = _beget_call("getData", login, password, {"fqdn": fqdn})
    records = _extract_records_from_getdata(getdata)
    mx_out, txt_out = _normalize_mx_txt_from_getdata(records)

    change_payload = {
        "fqdn": fqdn,
        "records": {
            "A": [{"priority": a_priority, "value": current_ip}],
            "MX": mx_out,
            "TXT": txt_out,
        },
    }

    result = _beget_call("changeRecords", login, password, change_payload)

    # Beget docs show `true` on success. Some APIs wrap into objects.
    ok = False
    if result is True:
        ok = True
    elif isinstance(result, dict):
        # best-effort compatibility
        if result.get("answer") is True or result.get("result") is True:
            ok = True
        if result.get("answer", {}).get("status") == "success":
            ok = True

    if not ok:
        LOG.error("changeRecords unexpected response: %r", result)
        raise RuntimeError(f"Beget changeRecords failed or returned unexpected response: {result!r}")

    state_file.write_text(current_ip + "\n", encoding="utf-8")
    LOG.info("updated %s A -> %s (was %s)", fqdn, current_ip, last_ip)
    return 0


if __name__ == "__main__":
    try:
        raise SystemExit(main())
    except Exception as e:
        _setup_logging()
        LOG.exception("fatal error: %s", e)
        raise SystemExit(1)

Для запуска скрипта создал systemd юнит и таймер:

/etc/systemd/system/beget-ddns.service:

Требуется прописать пользователя, группу, пути до рабочей директории, файла с переменными окружения и до исполняемого скрипта

[Unit]
Description=Beget DDNS updater (my-adress.ru A record)
After=network-online.target
Wants=network-online.target  

[Service]
Type=oneshot
User=your-user
Group=your-user-group
WorkingDirectory=/script/working/directory
EnvironmentFile=/path/to/environment/file.env
ExecStart=/usr/bin/python3 /path/to/script.py

/etc/systemd/system/beget-ddns.timer:

[Unit]
Description=Run Beget DDNS updater every 5 minutes
  
[Timer]
OnBootSec=2min
OnUnitActiveSec=5min
RandomizedDelaySec=30s
Persistent=true  

[Install]
WantedBy=timers.target

Для запуска:

sudo systemctl daemon-reload

sudo systemctl enable --now beget-ddns.timer

sudo systemctl start beget-ddns.service

Команды что бы проверить работу и глянуть логи:

sudo systemctl status beget-ddns.timer

sudo systemctl status beget-ddns.service

sudo systemctl list-timers --all | grep beget-ddns

sudo journalctl -u beget-ddns.service -n 200 --no-pager

tail -n 200 /path/to/log/file/beget-ddns.log

На сервачке у меня Caddy в качестве прокси и он сам создал сертификаты. Результатом доволен. Работает быстрее чем через TP-Link.

Комментарии (3)


  1. nerudo
    05.06.2026 12:08

    Это какие-то импортозамещенные технологии? 20 лет назад многие хостеры умели через запрос с ключом, автоматически настраивать новый адрес в dns. Всего-то надо было запускать wget по таймеру.


  1. Acidter
    05.06.2026 12:08

    Жаль, что мой ddns-скрипт из двух запросов curl и одной строки в crontab не тянет на статью, а то бы тоже написал.


  1. dmn13
    05.06.2026 12:08

    А чем не устраивает DuckDNS?