FT8 — цифровой радиолюбительский протокол, разработанный Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году. В этой статье будут рассмотрены подробности работы протокола.

Статья может быть интересна радиолюбителям, как знакомым, так и не знакомым с протоколами FT8 и FT4, а также тем, кто хочет в подробностях понять устройство этих протоколов.

Введение

Протокол FT8 был разработан радиолюбителями Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году, чьи инициалы были увековечены в названии протокола.

Разработчики протокола преследовали цель обеспечить связь при очень низком уровне сигнала, в том числе ниже порога слышимости человека; в плохих условиях прохождения радиосигнала; при использовании передатчиков низкой мощности; с использованием узких полос частот для приема и передачи сигнала.

Для обеспечения связи при вышеописанных условиях, формируемый сигнал сильно растянут во времени (порядка 15 секунд на передачу) и, как следствие, имеет низкий битрейт. Исходя из этого, протокол был спроектирован для работы в тайм-слотах (временные интервалы), поэтому работа протокола должна быть синхронизирована с точностью до секунд с мировым временем.

FT4 был разработан в 2019 году и является более быстрой версией протокола FT8. Основные цели разработки протокола: увеличения количества QSO за счёт уменьшения времени передачи одного сообщения; проведения радиосвязей в условиях соревнований.

Из-за повышенной скорости протокол имеет более широкую полосу частот.

Протоколы используют GFSK (Gaussian Frequency Shift Keying) — частотную манипуляцию с использованием фильтра Гаусса. Цифры в названии протокола определяют количество тонов, используемых при FSK.

Общие технические характеристики

FT8

  • Цикл передачи 15 секунд;

  • Скорость передачи 6,09 бит/с;

  • Ширина полосы 50 Гц;

  • Размер сообщения 77 бит + 12-бит CRC-14;

  • Механизм коррекции ошибок LDPC (174,87);

  • Модуляция 8-GFSK с шагом 6.25Гц;

  • Порог декодирования сигналов до -24…-26 дБ относительно шума.

FT4

  • Цикл передачи 7.5 секунд;

  • Ширина полосы 90 Гц;

  • Размер сообщения 77 бит + 12-бит CRC-14;

  • Механизм коррекции ошибок LDPC (174,87);

  • Модуляция 4-GFSK с шагом 23.4Гц;

  • Порог декодирования сигналов до -24…-26 дБ относительно шума.

Структура протокола

Как и любой протокол передачи данных, FT8 и FT4 представляю собой «матрешку», в которой данные одного уровня преобразуют данные для передачи уровнем ниже.

Общая схема протокола приведена на рисунке 1. Разделение на уровни модели OSI условное.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).

На рисунке 1.А приведена схема взаимодействия алгоритмов кодирования при формировании исходящего сигнала; на рисунке 1.B — схема обработки и декодирования принятого сигнала.

Протокол прикладного уровня

При проведении связей в FT8/FT4 участники разделяются на принимающих и передающих.

Так как протокол привязан к временным интервалам, прием и передача осуществляются в четные и нечетные четверти (⅛ в случае FT4) минут, передающие корреспонденты излучают сигнал, в то время как в этот временной промежуток осуществляется прием и накопление сигнала на стороне слушающих; в следующем временном промежутке происходит смена ролей, принимавшие сигнал передают ответ (если посчитают нужным), а передавшие свой сигнал становятся слушателями и т.д. цикл повторяется. Модульная привязка к секундам необходима для избежания ситуации, когда корреспонденты начинают передавать в произвольном порядке и расшифровка сигнала становится невозможной.

Обмен данными сильно похож на обычный любительский радиообмен, когда часть корреспондентов дает общий вызов, а остальные слушают и принимают решение о попытке проведения связи с этим участником.

Сценарий обычного сеанса связи:

Временной отрезок

Данные

Комментарий

00:00-00:15

CQ R3AAA KO85

Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи

00:15-00:30

R3AAA R9FXX LO87

Отвечает корреспондент из Перми (локатор LO87) с позывным R9FXX

00:30-00:45

R9FXX R3AAA -1

R3AAA дает рапорт для R9FXX в -1 dB по SNR

00:45-01:00

R3AAA R9FXX R-4

R9FXX дает рапорт для R3AAA в -4 dB по SNR

01:00-01:15

R9FXX R3AAA RR73

R3AAA подтверждает прием метаданных и прощается

01:15-01:30

R3AAA R9FXX 73

R9FXX взаимно отправляет 73

Общее время проведения связи наняло 1 минуту 30 секунд.

Сценарий короткого сеанса связи:

Временной отрезок

Данные

Комментарий

00:00-00:15

CQ R3AAA KO85

Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи

00:15-00:30

R3AAA R9FXX R-4

R9FXX дает рапорт для R3AAA в -4 dB по SNR

00:30-00:45

R9FXX R3AAA R-1

R3AAA дает рапорт для R9FXX в -1 dB по SNR

00:45-01:00

R3AAA R9FXX RR73

R9FXX подтверждает прием метаданных и прощается

01:00-01:15

R9FXX R3AAA 73

R3AAA взаимно отправляет 73

Сокращенные сеансы применяются в случаях когда у дальнего корреспондента большая очередь желающих провести связь.

В случаях когда кто-то из участников не может с первого раза получить ответ, сообщение может быть отослано повторно и сеанс связи потребует большего времени.

Пример сеанса радиосвязи R9FEU с R1CDY:

Время UTC

SNR

Δt

Частота

Данные

141945

Tx

1000

CQ R9FEU LO87

142000

-3

0.1

1001

R9FEU R1CDY KP40

142015

Tx

1000

R1CDY R9FEU -03

142030

1

0.1

1001

R9FEU R1CDY R-17

142045

Tx

1000

R1CDY R9FEU RR73

142100

-9

0.1

1001

R9FEU R1CDY 73

Пример сокращенного сеанса радиосвязи R9FEU c UA3DOI:

Время UTC

SNR

Δt

Частота

Данные

141745

Tx

1000

CQ R9FEU LO87

141800

8

0.1

1001

R9FEU UA3DOI -01

141815

Tx

1000

UA3DOI R9FEU R+08

141830

-5

0.1

1001

R9FEU UA3DOI RR73

141845

Tx

1000

UA3DOI R9FEU 73

Уровень представления

На уровне представления происходит подготовка данных, в частности перекодирование информации и расчет контрольной суммы сообщения CRC-14.

Формат сообщений

В основном, сообщения, передаваемые протоколами семейства FTX делятся на:

  • радиолюбительский обмен (QSO);

  • телеметрия;

  • произвольный текст.

Все радиолюбительские сообщения состоят из групп: вызываемый (call_to), вызывающий (call_de), данные (extra). В качестве данных может передаваться SNR-рапорт, четырех-символьный квадрат локации и сигналы roger/73.

Кодирование сообщения

Большинство сообщений в протоколе состоят из трех полей: вызываемый, вызывающий и доп. данные (например рапорт или гео данные).

Кодирование заключается в том, чтобы максимально плотно уместить данные исходного сообщения в 77 бит.

Определение констант и утилит, необходимых для кодирования:

import string


FTX_CHAR_TABLE_NUMERIC = string.digits
FTX_CHAR_TABLE_LETTERS = string.ascii_uppercase
FTX_CHAR_TABLE_ALPHANUM = f"{FTX_CHAR_TABLE_NUMERIC}{FTX_CHAR_TABLE_LETTERS}"
FTX_CHAR_TABLE_LETTERS_SPACE = f" {FTX_CHAR_TABLE_LETTERS}"
FTX_CHAR_TABLE_ALPHANUM_SPACE = f" {FTX_CHAR_TABLE_ALPHANUM}"
FTX_CHAR_TABLE_ALPHANUM_SPACE_SLASH = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}/"
FTX_CHAR_TABLE_FULL = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}+-./?"


def charn(c: int, table: str) -> str:
   return table[c]


def nchar(c: str, table: str) -> int:
   return table.find(c)

В протоколах семейства FTX алфавит состоит из ASCII символов «ABCDEFGHIJKLMNOPQRSTUVWXYZ», цифр «0123456789» и символов «+-./?», включая знак пробела.

Полный алфавит содержится в FTX_CHAR_TABLE_FULL.

В алфавите FTX символу A соответствует значение в 1, B — 2 и т.д.. Для перевода из ANSI в алфавит FTX используется функция nchar, которая по своей сути сопоставляет индекс кодируемого символа c в таблице table, которой является одно из представлений значений FTX_CHAR_TABLE_*. В случае обработки неподдерживаемого символа, функция возвращает значение -1.

Для декодирования из формата FTX используется функция charn, которая решает обратную задачу, возвращает ANSI символ, соответствующей позиции c в table.

Кодирование позывного сигнала

Для кодирования позывного сигнала его необходимо нормализовать до 6-и символьного. Короткие позывные дополняются пробелами в начале строки, а длинные приводятся к другому виду. Швейцарские позывные из формата 3DA0XYZ нормализуются в 3D0XYZ, африканские 3XA0XYZ в QA0XYZ.

Функция кодирования позывного сигнала:

FTX_BASECALL_CHAR_MAP = [
   FTX_CHAR_TABLE_ALPHANUM_SPACE,
   FTX_CHAR_TABLE_ALPHANUM,
   FTX_CHAR_TABLE_NUMERIC,
   FTX_CHAR_TABLE_LETTERS_SPACE,
   FTX_CHAR_TABLE_LETTERS_SPACE,
   FTX_CHAR_TABLE_LETTERS_SPACE
]


def pack_basecall(callsign: str) -> int:
   if (length := len(callsign)) > 2:
       if callsign.startswith("3DA0") and 4 < length <= 7:
           cs_6 = f"3D0{callsign[4:]}"
       elif callsign.startswith("3X") and callsign[2].isalpha() and length <= 7:
           cs_6 = f"Q{callsign[2:]}"
       elif callsign[2].isdigit() and length <= 6:
           cs_6 = callsign
       elif callsign[1].isdigit() and length <= 5:
           cs_6 = f" {callsign}"
       else:
           cs_6 = " " * 6

       cs_6 = cs_6 + " " * (6 - len(cs_6))

       n_chars = list(map(nchar, cs_6, FTX_BASECALL_CHAR_MAP))

       if all(nc >= 0 for nc in n_chars):
           n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_BASECALL_CHAR_MAP, n_chars), 0)
           return n
   return -1

В FTX_BASECALL_CHAR_MAP определяется последовательность алфавитов, на основе которых будет производиться кодирование позывного сигнала, таким образом, первая цифро-буквенная литера является опциональной; третьим символом обязательно должно быть цифровое значение, далее следуют буквенные символы, включая пробел.

Функция pack_basecall на вход принимает строку с позывным сигналом, длина которого должна быть минимум 3 знака. Далее позывной нормализуется до 6-и символьного. В n_chars содержатся закодированные по схеме FTX_BASECALL_CHAR_MAP символы; затем, если перекодирование всех символов прошло успешно, выполняется битовая упаковка данных, путем умножения результирующего значения на мощности используемых алфавитов.

При кодировании позывного R1ABC происходит нормализация к виду ' R1ABC', Индексы символов: [0, 27, 1, 1, 2, 3]; результат: 5334879 (0b10100010110011101011111).

Декодирование позывного сигнала

Операция декодирования позывного сигнала выполняет в обратном порядке все те же самые операции, что и операция кодирования.

NTOKENS = 2063592
MAX22 = 4194304


FTX_BASECALL_SUFFIX_FMT = {
   1: "{cs}/R",
   2: "{cs}/P",
}
FTX_TOKEN_STR = {
   v: k for k, v in FTX_TOKEN_CODE.items()
}


def unpack_callsign(cs_28: int, flags: bool, suffix: int) -> typing.Optional[str]:
   if cs_28 < NTOKENS:
       if cs_28 <= 2:
           return FTX_TOKEN_STR.get(cs_28)
       if cs_28 <= 1002:
           return f"CQ_{cs_28 - 3:03}"
       if cs_28 <= 532443:
           n = cs_28 - 1003
           aaaa = ""
           for i in range(4):
               ct = FTX_CHAR_TABLE_LETTERS_SPACE
               ct_l = len(ct)
               aaaa = charn(n % ct_l, ct) + aaaa
               n //= ct_l

           return f"CQ_{aaaa.strip()}"
       return None

   cs_28 -= NTOKENS
   n = cs_28 - MAX22

   callsign = ""
   for ct in reversed(FTX_BASECALL_CHAR_MAP):
       ct_l = len(ct)
       callsign = charn(n % ct_l, ct) + callsign
       n //= ct_l

   callsign = callsign.strip()

   if callsign.startswith("3D0") and callsign[3] != " ":
       result = f"3DA0{callsign[3:]}"
   elif callsign[0] == "Q" and callsign[1].isalpha():
       result = f"3X{callsign[1:]}"
   else:
       result = callsign

   if len(result) < 3:
       return None

   if flags:
       if fmt := FTX_BASECALL_SUFFIX_FMT.get(suffix):
           return fmt.format(cs=result)
       raise ValueError

   return result

Функция unpack_callsign выполняет распаковку бит в строку, в соответствии с таблицами символов. Декодирование символов происходит в обратном порядке, reversed(FTX_BASECALL_CHAR_MAP). Функция также проверяет, являются ли переданные данные шаблонными значениями, такими как CQ/DE/QRZ, перечисленными в FTX_TOKEN_STR.

Также дополнительно происходит добавление суффиксов, если указан параметр flags (суффиксы /R, /P из FTX_CALLSIGN_SUFFIX_FMT). Длинные позывные, подвергшиеся сокращению на этапе кодирования, также восстанавливаются до первоначального вида.

Результатом работы функции является строка, содержащая позывной сигнал в текстовом виде.

Кодирование локации

При установлении связи, корреспонденты обмениваются геолокационными квадратами (QTH Loc), представляющие собой строку из чередующихся пар букв и цифр, каждая пара уточняет границы расположения географической точки.

Расчет значений:

  1. пара букв:

    • долгота делится на 20, целая часть от 0 до 17 кодируется буквами от A до R;

    • широта делится на 10, целая часть от 0 до 17 кодируется буквами от A до R;

  2. пара цифр:

    • остаток долготы от деления на 20 делится на 2;

    • остаток широты от деления на 10 делится на 1.

FTX_CHAR_TABLE_GRID_LETTERS = FTX_CHAR_TABLE_LETTERS[:18]
FTX_GRID_CHAR_MAP = [
   FTX_CHAR_TABLE_GRID_LETTERS,
   FTX_CHAR_TABLE_GRID_LETTERS,
   FTX_CHAR_TABLE_NUMERIC,
   FTX_CHAR_TABLE_NUMERIC
]


def pack_grid(grid4: str) -> int:
   n_chars = list(map(nchar, grid4, FTX_GRID_CHAR_MAP))
   n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_GRID_CHAR_MAP, n_chars), 0)
   return n

Для кодирования квадрата используется усеченный алфавит FTX_CHAR_TABLE_GRID_LETTERS и карта соответствия FTX_GRID_CHAR_MAP.

Аналогично функции pack_basecall, функция pack_grid переводит четырехсимвольный квадрат в число; например квадрат KO85 (Москва) будет закодирован в 19485 (0b100110000011101).

Кодирование рапорта

Так как третьей частью сообщения может быть как геолокационный квадрат или рапорт, так же может быть и roger-ответ; при кодировании необходимо отличать тип передаваемых данных.

FTX_MAX_GRID_4 = 32400
FTX_RESPONSE_EXTRAS_CODE = {
   "": FTX_MAX_GRID_4 + 1,
   "RRR": FTX_MAX_GRID_4 + 2,
   "RR73": FTX_MAX_GRID_4 + 3,
   "73": FTX_MAX_GRID_4 + 4,
}


def pack_extra(extra: str) -> int:
   if id_resp := FTX_RESPONSE_EXTRAS_CODE.get(extra):
       return id_resp
   if re.match(r"^(([A-R]{2})([0-9]{2}))$", extra):
       return pack_grid(extra)
   if not (report := re.match(r"^(R){0,1}([\+\-]{0,1}[0-9]+)$", extra)):
       raise FTXInvalidRST

   r_sign, r_val = report.groups()
   i_report = int(r_val) + 35
   return (FTX_MAX_GRID_4 + i_report) | (0x8000 if r_sign is not None else 0)

Функция pack_extra сначала проверяет признак, не являются ли переданные данные roger-ответом, далее, если данные соответствуют регулярному выражению с геолокационным квадратом, осуществляется вызов ранее рассмотренной функции pack_grid; в противном случае переданные данные интерпретируются как SNR-рапорт.

Декодирование рапорта и локации

Операция декодирования рапорта/локации, как и вышеописанная операции декодирования позывного сигнала, также в обратном порядке выполняет действия, осуществляемые при кодировании.

FTX_RESPONSE_EXTRAS_STR = {
   v: k for k, v in FTX_RESPONSE_EXTRAS_CODE.items()
}


def unpack_extra(ex_16: int, is_report: bool) -> typing.Optional[str]:
   if ex_16 <= FTX_MAX_GRID_4:
       n = ex_16
       dst = ""
       for ct in reversed(FTX_GRID_CHAR_MAP):
           ct_l = len(ct)
           dst = charn(n % ct_l, ct) + dst
           n //= ct_l

       return f"{'R ' if is_report else ''}{dst}"
   else:
       if irpt := FTX_RESPONSE_EXTRAS_STR.get(ex_16):
           return irpt

       return f"{'R' if is_report else ''}{int(ex_16 - FTX_MAX_GRID_4 - 35):+03}"

Функция unpack_extra проверяет, являются ли переданные данные локатором или SNR-рапортом. Декодирование локации осуществляется с конца к началу.

При передаче рапорта, дополнительно, осуществляется проверка на шаблонный ответ из FTX_RESPONSE_EXTRAS_STR.

Результатом работы функции является строка, содержащая либо идентификатор локации, либо рапорт (или шаблонный ответ, например RR73).

Кодирование стандартного сообщения

Стандартное сообщение состоит из трех частей:

  • общий вызов или позывной вызываемого;

  • позывной вызываемого;

  • квадрат локации или рапорт/roger-сигнал.

NTOKENS = 2063592
MAX22 = 4194304
FTX_TOKEN_CODE = {
   "DE": 0,
   "QRZ": 1,
   "CQ": 2
}


def byte(i: int) -> int:
   return i & 0xff


def endswith_any(s: str, *tails: str) -> bool:
    return any(s.endswith(tail) for tail in tails)


def pack_callsign(callsign: str) -> typing.Tuple[int, int]:
   shift = 0
   if token := FTX_TOKEN_CODE.get(callsign):
       return token, shift

   length = len(callsign)
   if callsign.startswith("CQ_") and length < 8:
       rest = callsign[3:]
       rest_len = len(rest)

       if rest_len == 3 and rest.isdigit():
           return int(rest) + 3, shift

       if 1 <= rest_len <= 4:
           nlet = 0
           correct = True
           for c in rest:
               if (n := nchar(c, FTX_CHAR_TABLE_LETTERS_SPACE)) == -1:
                   correct = False
                   break
               nlet = nlet * 27 + n

           if correct:
               return nlet + 1003, shift

   length_base = length
   if endswith_any(callsign, "/P", "/R"):
       shift = 1
       length_base = length - 2

   if (n28 := pack_basecall(callsign[:length_base])) >= 0:
       return dword(NTOKENS + MAX22 + n28), shift

   raise FTXPackCallsignError


def ftx_message_encode_std(call_to: str, call_de: str, extra: str) -> typing.ByteString:
   b28_to, sh_to = pack_callsign(call_to)
   if b28_to < 0:
       raise FTXErrorCallSignTo

   b28_de, sh_de = pack_callsign(call_de)
   if b28_de < 0:
       raise FTXErrorCallSignDe

   suffix = 1
   if any(cs.endswith("/P") for cs in (call_to, call_de)):
       suffix = 2
       if any(cs.endswith("/R") for cs in (call_to, call_de)):
           raise FTXErrorSuffix

   if call_to == "CQ" and "/" in call_de and not endswith_any(call_de, "/P", "/R"):
       raise FTXErrorCallSignDe

   b16_extra = pack_extra(extra)

   b29_to = dword(b28_to << 1 | sh_to)
   b29_de = dword(b28_de << 1 | sh_de)

   if endswith_any(call_to, "/P", "/R"):
       b29_to |= 1
       if call_to.endswith("/P"):
           suffix = 2

   bytes = [
       byte(b29_to >> 21),
       byte(b29_to >> 13),
       byte(b29_to >> 5),
       byte(b29_to << 3) | byte(b29_de >> 26),
       byte(b29_de >> 18),
       byte(b29_de >> 10),
       byte(b29_de >> 2),
       byte(b29_de << 6) | byte(b16_extra >> 10),
       byte(b16_extra >> 2),
       byte(b16_extra << 6) | byte(suffix << 3)
   ]
   return bytearray(b for b in bytes)

Функция ftx_message_encode_std принимает на вход три аргумента с вышеописанными данными. При кодировании позывных сигналов, функция pack_callsign проверяет, является ли позывной специальным токеном (например CQ при общем вызове), для которых, для повышения плотности, присвоены специальные коды в FTX_TOKEN_CODE; далее, используя вышеописанные функции битового кодирования, позывной кодируется в набор бит в виде целого числа, после чего данные конкатенируются в массив байт.

Суффиксы позывных сигналов отмечаются в третьем блоке данных сообщения. Код byte(suffix << 3) записывает в сообщение признак того, что это стандартное сообщение. Значение «1» суффикса соответствует обычному сообщению; значение «2» — проведение связей в формате DX-педиции.

Перечень типов сообщений:

  • FTX_MESSAGE_TYPE_FREE_TEXT — 1;

  • FTX_MESSAGE_TYPE_DXPEDITION — 2;

  • FTX_MESSAGE_TYPE_EU_VHF — 3;

  • FTX_MESSAGE_TYPE_ARRL_FD — 4;

  • FTX_MESSAGE_TYPE_TELEMETRY — 6;

  • FTX_MESSAGE_TYPE_CONTESTING — 7;

  • FTX_MESSAGE_TYPE_STANDARD — 8;

  • FTX_MESSAGE_TYPE_ARRL_RTTY — 10;

  • FTX_MESSAGE_TYPE_NONSTD_CALL — 11;

  • FTX_MESSAGE_TYPE_WWROF — 12.

Примеры кодирования сообщений:

Сообщение

Закодированные данные (hex)

CQ R1ABC KO85

00000020587223930748

R2CBA R1ABC R+01

0b136da0587223bfad08

R1ABC R2CBA -20

0b0e4470589b6d1fa7c8

R2CBA R1ABC RR73

0b136da05872239fa4c8

Декодирование стандартного сообщения

Декодирование сообщения осуществляется выполнением в обратной последовательности тех же действий, что и при кодировании.

def ftx_message_decode_std(payload: typing.ByteString) -> typing.Tuple[str, str, str]:
   b29_to = payload[0] << 21
   b29_to |= payload[1] << 13
   b29_to |= payload[2] << 5
   b29_to |= payload[3] >> 3

   b29_de = (payload[3] & 0x07) << 26
   b29_de |= payload[4] << 18
   b29_de |= payload[5] << 10
   b29_de |= payload[6] << 2
   b29_de |= payload[7] >> 6

   r_flag = (payload[7] & 0x20) >> 5

   b16_extra = (payload[7] & 0x1F) << 10
   b16_extra |= payload[8] << 2
   b16_extra |= payload[9] >> 6

   cs_flags = (payload[9] >> 3) & 0x07

   if (call_to := unpack_callsign(b29_to >> 1, bool(b29_to & 1), cs_flags)) is None:
       raise FTXErrorCallSignTo
   if (call_de := unpack_callsign(b29_de >> 1, bool(b29_de & 1), cs_flags)) is None:
       raise FTXErrorCallSignDe
   if (extra := unpack_extra(b16_extra, bool(r_flag & 1))) is None:
       raise FTXErrorGrid
   return call_to, call_de, extra

Функция ftx_message_decode_std принимает на вход строку байт, битовыми операциями извлекает участки данных с закодированными позывными и рапортом/локацией. Далее функция передает извлеченные данные в функцию unpack_callsign и unpack_extra, которые декодируют значения и возвращают данные в строковом виде.

Результатом работы функции является тройка строковых данных, состоящих из вызываемого и вызывающего позывных, а также доп данных в виде локации/лапорта.

Телеметрия и произвольный текст

Протоколы семейства FTX предусматривают возможность отправки данных, не относящихся к стандартному радиолюбительскому обмену, например передавать данные телеметрии. Так как телеметрия представляет собой последовательность байт и максимальный размер передаваемых данных составляет 77 бит, то используя механизм упаковки бит, можно закодировать текстовое сообщение длиной до 12 символов.

FTX_MESSAGE_FREE_TEXT_LEN = 12
FTX_MESSAGE_TELEMETRY_LEN = 9


def ftx_message_encode_free(text: str) -> typing.ByteString:
   if len(text) > FTX_MESSAGE_FREE_TEXT_LEN:
       raise FTXErrorTooLong

   payload = bytearray(b"\x00" * FTX_MESSAGE_TELEMETRY_LEN)
   text = (" " * (FTX_MESSAGE_FREE_TEXT_LEN - len(text))) + text
   for c in text:
       if (cid := nchar(c, FTX_CHAR_TABLE_FULL)) == -1:
           raise FTXErrorInvalidChar

       rem = cid
       for i in reversed(range(FTX_MESSAGE_TELEMETRY_LEN)):
           rem += payload[i] * len(FTX_CHAR_TABLE_FULL)
           payload[i] = byte(rem)
           rem >>= 8

   return ftx_message_encode_telemetry(payload)


def ftx_message_encode_telemetry(payload: typing.ByteString) -> typing.ByteString:
   if len(payload) > FTX_MESSAGE_TELEMETRY_LEN:
       raise FTXErrorTooLong

   carry = 0
   data = bytearray(b"\x00" * len(payload))
   for i, t_byte in enumerate(reversed(payload)):
       data[-i - 1] = byte((carry >> 7) | (t_byte << 1))
       carry = byte(t_byte & 0x80)

   return data

Функция ftx_message_encode_free осуществляет упаковку бит символов из text, используя таблицу FTX_CHAR_TABLE_FULL, в 9-и битный массив байт и передает его для дальнейшего кодирования в ftx_message_encode_telemetry. Перед кодированием исходный текст нормализуется до 12-и символьной строки, путем добавления необходимого количества пробелов в начало текста.

Таким образом, строка «0123456789AB» будет преобразована в массив байт (payload) «000a7271499bcb384a».

Функция ftx_message_encode_telemetry выполняет битовый сдвиг данных для выравнивания по правому краю.

Результатом является массив байт, в каждой ячейке которого значения сдвинуты влево на 1 бит, относительно исходных данных.

Декодирования телеметрии и произвольного текста осуществляется в обратном порядке: принятые данные телеметрии сдвигаются на 1 бит вправо, а текст сообщения раскодируется по таблице FTX_CHAR_TABLE_FULL обратно в строку.

def ftx_message_decode_telemetry(data: typing.ByteString) -> typing.Generator[int, None, None]:
   carry = 0
   for p_byte in data:
       yield byte((carry << 7) | (p_byte >> 1))
       carry = byte(p_byte & 0x01)


def ftx_message_decode_free(data: typing.ByteString) -> str:
   payload = bytearray(ftx_message_decode_telemetry(data))
   text = " "
   for _ in range(FTX_MESSAGE_FREE_TEXT_LEN):
       rem = 0
       for i in range(FTX_MESSAGE_TELEMETRY_LEN):
           rem = (rem << 8) | payload[i]
           payload[i] = byte(rem // 42)
           rem = rem % 42

       text = charn(rem, FTX_CHAR_TABLE_FULL) + text

   return text.strip()

CRC-14

Расчет CRC-14

В качестве проверки целостности сообщения в протоколах семейства FTX предусмотрен расчет 14-и битного циклического избыточного кода (контрольной суммы) — CRC-14. Данный код подтверждает факт целостности данных в принятом сигнале; сигналы, в которых расчетная контрольная сумма сообщения не совпадает с фактической, игнорируются на принимающей стороне.

При 14-и битной контрольной сумме, алгоритм CRC-14 обеспечивает высокую вероятность обнаружения ошибок на коротких сообщениях, и минимально расходует объем передаваемых данных. По сравнению. например, с CRC-8, рассматриваемый алгоритм позволяет находить до 4 бит ошибок, а также имеет более низкую вероятность коллизии (1/16384 против 1/256 для CRC-8). Таким образом, алгоритм CRC-14 используется как компромисс между компактностью и надежностью; алгоритм CRC-8 и близкие к нему, более применимы к совсем коротким сообщениям.

FTX_LDPC_K = 91
FTX_LDPC_M = 83
FTX_LDPC_N = FTX_LDPC_K + FTX_LDPC_M
FTX_LDPC_N_BYTES = ((FTX_LDPC_N + 7) // 8)
FTX_LDPC_K_BYTES = ((FTX_LDPC_K + 7) // 8)


FTX_CRC_POLYNOMIAL = 0x2757
FTX_CRC_WIDTH = 14
FTX_PAYLOAD_BITS = 96
FTX_MESSAGE_BITS = FTX_PAYLOAD_BITS - FTX_CRC_WIDTH
TOPBIT = 1 << (FTX_CRC_WIDTH - 1)


def ftx_compute_crc(message: typing.ByteString, num_bits: int) -> int:
   remainder = 0
   idx_byte = 0

   for idx_bit in range(num_bits):
       if idx_bit % 8 == 0:
           remainder ^= message[idx_byte] << (FTX_CRC_WIDTH - 8)
           idx_byte += 1

       if remainder & TOPBIT != 0:
           remainder = (remainder << 1) ^ FTX_CRC_POLYNOMIAL
       else:
           remainder = remainder << 1

   return remainder & ((TOPBIT << 1) - 1)


def ftx_add_crc(payload: typing.ByteString) -> typing.ByteString:
   message = payload + (b"\x00" * (FTX_LDPC_K_BYTES - len(payload)))
   message[-3] &= 0xf8
   message[-2] = 0

   checksum = ftx_compute_crc(message, FTX_MESSAGE_BITS)

   message[-3] |= byte(checksum >> 11)
   message[-2] = byte(checksum >> 3)
   message[-1] = byte(checksum << 5)

   return message

Функция ftx_compute_crc реализует алгоритм вычисления CRC-14. Полином, для вычисления остатка от деления данных, определен в FTX_CRC_POLYNOMIAL со значением 0x2757 (0b10011101010111).

Задача функции ftx_add_crc рассчитать и добавить к исходным данных значение функции ftx_compute_crc; предварительно данные подгоняются под длину сообщения в 12 байт (98 бит), контрольная сумма записывается в три последних байта сообщения.

Пример контрольной суммы для данных 00000020587223930748 (закодированное сообщение CQ R1ABC KO85): 11173 (0x2ba5); результат добавления контрольной суммы к данным: 0000002058722393074d74a0.

Проверка CRC-14

Так как принимаемое сообщение представляет собой массив байт, в котором содержатся сами данные сообщения и контрольный проверочный код CRC-14, при проверке достаточно отделить контрольную сумму от данных, выполнить пересчет CRC-14 и сравнить полученный результат с принятой контрольной суммой.

def ftx_crc(msg1: typing.ByteString, msglen: int) -> typing.ByteString:
   div = [1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1]

   msg = bytearray(b"\x00" * (FTX_LDPC_M + FTX_CRC_WIDTH))
   for i in range(msglen + FTX_CRC_WIDTH):
       if i < 77:
           msg[i] = msg1[i]

   for i in range(msglen):
       if msg[i] != 0:
           for j, d in enumerate(div):
               msg[i + j] = msg[i + j] ^ d

   return msg[msglen:msglen + FTX_CRC_WIDTH]


def ftx_check_crc(a91: typing.ByteString) -> bool:
   out1 = ftx_crc(a91, 82)
   for i, b in enumerate(out1):
       if b != a91[FTX_LDPC_K - FTX_CRC_WIDTH + i]:
           return False
   return True

Функция ftx_check_crc осуществляет проверку CRC-14 в блоке данных a91, извлекая 82 бита данных из проверяемого 91 битного блока данных и сверяет совпадение CRC-14 данных с контрольным кодом, находящимся в хвосте a91.

Результатом функции является булево значение.

LDPC (низкоплотностный код)

LDPC (Low-Density Parity-Check code) — код с малой плотностью проверок на чётность. Алгоритм используется для исправления ошибок, возникающих при передаче данных, повышая общую помехоустойчивость.

Более подробное описание теории и принципов работы LDPC достаточно объемное и выходит за рамки статьи.

С подробностями устройства и работы LDPC можно ознакомиться здесь:

  • https://en.wikipedia.org/wiki/Low-density_parity-check_code

  • https://habr.com/ru/articles/453086/

  • https://habr.com/ru/articles/830150/

Кодирование LDPC

LDPC-код относится к категории линейных блочных кодов, определяемых матрицей проверки на четность с небольшим количеством единиц; таким образом, в матрице, используемой при кодировании, количество единиц относительно невелико по сравнению с общим количеством элементов в матрице.

Рисунок 2: Граф Таннера и матрица проверок LDPC.

На рисунке 2 приведена схема представления работы LDPC-кодов, в частности LDPC-код (12, 8); в кругах, пронумерованных от 1 до 8 обозначены данные исходного сообщения, в квадратах с номерами от 1 до 4 — биты четности с такими значениями, чтобы результат сложения бит данных с битом четности, по модулю 2, был равен нулю.

Проверочная матрица H в столбцах определяет то, с какими битами исходного сообщения связан бит четности (так, на рисунке линиями первый бит четности соединен с 1, 4,5, 7 и 8 битами сообщения, что соответствует первой строке матрице в виде записи [1 0 0 1 1 0 1 1]); количество строк матрицы определяет то, сколько битов четности будет рассчитано. Также проверочная матрица H должна обладать свойствами разреженности и нерегулярности.

Таким образом, зная дополнительные биты четности и структуру проверочной матрицы, повторно выполнив суммирование бит по модулю 2, можно проверить корректность данных и, в случае искажения данных, локализовать и устранить ошибку, так как на один бит данных может ссылаться несколько битов проверки четности. Таким же образом осуществляется определение синдрома ошибки в самих проверочных битах.

Протоколы семейства FTX используют LDPC(174, 87), то есть к исходным данным добавляется 87 бит с проверкой на четность.

Таким образом, на принимающей стороне, при потере или искажении части сообщения, сигнал можно будет восстановить используя избыточные биты рассчитанные при LDPC-кодировании.

FTX_LDPC_GENERATOR = [
   [0x83, 0x29, 0xce, 0x11, 0xbf, 0x31, 0xea, 0xf5, 0x09, 0xf2, 0x7f, 0xc0],
   [0x76, 0x1c, 0x26, 0x4e, 0x25, 0xc2, 0x59, 0x33, 0x54, 0x93, 0x13, 0x20],
   [0xdc, 0x26, 0x59, 0x02, 0xfb, 0x27, 0x7c, 0x64, 0x10, 0xa1, 0xbd, 0xc0],
   ...
   [0x60, 0x8c, 0xc8, 0x57, 0x59, 0x4b, 0xfb, 0xb5, 0x5d, 0x69, 0x60, 0x00]
]


def parity8(x: int) -> int:
   for i in [4, 2, 1]:
       x ^= x >> i
   return byte(x % 2)


def ftx_encode(message: typing.ByteString) -> typing.ByteString:
   codeword = bytearray(message[i] if i < FTX_LDPC_K_BYTES else 0 for i in range(FTX_LDPC_N_BYTES))

   col_mask = 0x80 >> (FTX_LDPC_K % 8)
   col_idx = FTX_LDPC_K_BYTES - 1

   for i in range(FTX_LDPC_M):
       nsum = 0
       for j in range(FTX_LDPC_K_BYTES):
           bits = message[j] & FTX_LDPC_GENERATOR[i][j]
           nsum ^= parity8(bits)

       if nsum % 2:
           codeword[col_idx] |= col_mask

       col_mask >>= 1
       if col_mask == 0:
           col_mask = 0x80
           col_idx += 1

   return codeword

Проверочная матрица определена в FTX_LDPC_GENERATOR, так как она достаточно большая, чтобы целиком приводить ее в тексте, приведены только начало и конец матрицы. Саму матрицу целиком можно посмотреть здесь.

Функция ftx_encode осуществляет LDPC кодирование данных, переданных в параметре message; результатом функции является 174 битный код, содержащий в себе исходные данные из message, включая 87 LDPC-бит.

Функция parity8 осуществляет побитовую операцию XOR для расчета бита четности.

Пример:

message: 0000002058722393074d74a0

ftx_encode: 0000002058722393074d74a67d749e15d81ecea9e3a0

Декодирование LDPC

Один из популярных алгоритмов декодирования LDPC-кодов — алгоритм Belief Propagation (алгоритм распространения доверия).

Каждая итерация алгоритма состоит из двух шагов:

  1. узлы данных передают сообщения узлам проверок;

  2. проверочные узлы передают сообщения узлам данных.

Итерации повторяются до достижения приемлемого количества ошибок, либо до достижения максимального количества итераций.

FTX_LDPC_NM = [
   [4, 31, 59, 91, 92, 96, 153],
   ...
   [17, 42, 75, 129, 170, 172, 0],
]


FTX_LDPC_MN = [
   [16, 45, 73],
   ...
   [42, 49, 57],
]


FTX_LDPC_NUM_ROWS = [
   7, 6, 6, 6, 7, 6, 7, 6, 6, 7, 6, 6, 7, 7, 6, 6,
   ...
   6, 6, 6
]


def fast_tanh(x: float) -> float:
   if x < -4.97:
       return -1.0
   if x > 4.97:
       return 1.0
   x2 = x ** 2
   a = x * (945.0 + x2 * (105.0 + x2))
   b = 945.0 + x2 * (420.0 + x2 * 15.0)
   return a / b


def fast_atanh(x: float) -> float:
   x2 = x ** 2
   a = x * (945.0 + x2 * (-735.0 + x2 * 64.0))
   b = (945.0 + x2 * (-1050.0 + x2 * 225.0))
   return a / b


def ldpc_check(codeword: bytes) -> int:
   errors = 0
   for m in range(FTX_LDPC_M):
       x = 0
       for i in range(FTX_LDPC_NUM_ROWS[m]):
           x ^= codeword[FTX_LDPC_NM[m][i] - 1]
       if x:
           errors += 1
   return errors


def bp_decode(codeword: typing.List[float], max_iters: int) -> typing.Tuple[int, typing.ByteString]:
   min_errors = FTX_LDPC_M

   tov = [[0.0] * 3 for _ in range(FTX_LDPC_N)]
   toc = [[0.0] * 7 for _ in range(FTX_LDPC_M)]

   plain = bytearray(b"\x00" * FTX_LDPC_N)

   for _ in range(max_iters):
       plain_sum = 0
       for n in range(FTX_LDPC_N):
           plain[n] = int((codeword[n] + tov[n][0] + tov[n][1] + tov[n][2]) > 0)
           plain_sum += plain[n]

       if plain_sum == 0:
           min_errors = FTX_LDPC_M
           break

       if (errors := ldpc_check(plain)) < min_errors:
           min_errors = errors
           if errors == 0:
               break

       for m in range(FTX_LDPC_M):
           for n_idx in range(FTX_LDPC_NUM_ROWS[m]):
               n = FTX_LDPC_NM[m][n_idx] - 1
               Tnm = codeword[n]
               for m_idx in range(3):
                   if (FTX_LDPC_MN[n][m_idx] - 1) != m:
                       Tnm += tov[n][m_idx]
               toc[m][n_idx] = fast_tanh(-Tnm / 2)

       for n in range(FTX_LDPC_N):
           for m_idx in range(3):
               m = FTX_LDPC_MN[n][m_idx] - 1
               Tmn = 1.0
               for n_idx in range(FTX_LDPC_NUM_ROWS[m]):
                   if (FTX_LDPC_NM[m][n_idx] - 1) != n:
                       Tmn *= toc[m][n_idx]
               tov[n][m_idx] = -2 * fast_atanh(Tmn)

   return min_errors, plain


def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:
   sum = 0
   sum2 = 0
   for it in log174:
       sum += it
       sum2 += it ** 2

   inv_n = 1.0 / FTX_LDPC_N
   variance = (sum2 - (sum * sum * inv_n)) * inv_n

   norm_factor = math.sqrt(24.0 / variance)

   for it in log174:
       yield it * norm_factor

Функция bp_decode реализует алгоритм Belief Propagation, в параметр codeword передается нормализованные функцией ftx_normalize_logl данные для декодирования; параметр max_iters определяет максимальное количество итераций на попытку раскодировать данные.

Результатом работы функции является кортеж, содержащий в себе счетчик ошибок и набор байт декодированных данных.

Матрица FTX_LDPC_NM содержит строки проверки четности, каждое значение — индекс в кодовом слове.

Матрица FTX_LDPC_MN описывает биты кодового слова, числа определяют какие три проверки четности из FTX_LDPC_NM относятся к кодовому слову.

Размеры матриц слишком большие чтобы полностью рассмотреть их в рамках статьи и доступны по ссылкам:

Примечание: рассматриваемый пример слабо оптимизирован и имеет высокую алгоритмическую сложность, на Python работает достаточно медленно на сильно зашумленных данных.

Транспортный уровень

На транспортном уровне в протоколах семейства FTX происходит перекодирование бит данных в код Грея и добавление данных матриц Костаса в качестве маркеров сигнала.

Код Грея

Код Грея (Gray code) — двоичный, зеркальный код, в котором две соседние комбинации отличаются одной цифрой (расстояние Хемминга между соседними комбинациями кода равно единице).

Пример записи кода Грея для размерностей 1, 2 и 3 приведен на рисунке 3, также можно заметить симметричность кода, из-за чего код и получил название «зеркальный код».

Рисунок 3: Коды Грея размерностей 1, 2 и 3, и его свойство зеркальности.

Рисунок 4: Круговой энкодер с кодом Грея и его развернутое представление.

На рисунке 4 приведен пример использования кода Грея в энкодере.

Более подробно про код Грея можно прочитать здесь.

Исходя из этих свойств, код Грея позволяет снизить вероятность ошибки при изменении значений без сигналов тактирования.

В протоколах семейства FTX код Грея также применяется для повышения помехоустойчивости, снижая вероятность ошибки на принимающей стороне при декодировании, таким образом, приемник сможет исправить ошибку, так как соседние значения будут отличаться она один бит.

Для FT8 используется трех битный код Грея, для FT4 — двух.

Алгоритм расчета кода Грея:

def bin_to_gray(b: int) -> int:
   return b ^ (b >> 1)


for i in range(10):
   print(f"{i}, {i:5b}, {bin_to_gray(i):5b}")

Результат расчета:

dec, bin,   gray
 0, 00000, 00000
 1, 00001, 00001
 2, 00010, 00011
 3, 00011, 00010
 4, 00100, 00110
 5, 00101, 00111
 6, 00110, 00101
 7, 00111, 00100
 8, 01000, 01100
 9, 01001, 01101

Перевод в код Грея

Исходя из таблицы, коды Грея можно представить в виде списков, где значения индексов списка будут совпадать с представлением числа в формате Грея:

FT8_GRAY_MAP = [0, 1, 3, 2, 5, 6, 4, 7]
FT4_GRAY_MAP = [0, 1, 3, 2]

Таким образом, задача перевода данных из двоичного кода в код Грея реализуется кодом:

mask = 0x80
i_byte = 0
bits3 = 0


for bit_or in [4, 2, 1]:
   if codeword[i_byte] & mask:
       bits3 |= bit_or

   mask >>= 1
   if mask == 0:
       mask = 0x80
       i_byte += 1

yield FT8_GRAY_MAP[bits3]
mask = 0x80
i_byte = 0
bits2 = 0


for bit_or in [2, 1]:
   if codeword[i_byte] & mask:
       bits2 |= bit_or

   mask >>= 1
   if mask == 0:
       mask = 0x80
       i_byte += 1

yield FT4_GRAY_MAP[bits2]

Где bits3 и bits2 — биты исходного сообщения codeword.

Перевод из кода Грея

Декодирование в протоколах семейства FTX осуществляется перебором, путем выделения из сигнала тех бит, амплитуды которых в сигнале максимальны, они, в свою очередь, дадут представление о битах в двоичной форме.

def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:
   s2 = [self.wf.mag[mag_idx + gc] for gc in FT4_GRAY_MAP]

   logl_0 = max(s2[2], s2[3]) - max(s2[0], s2[1])
   logl_1 = max(s2[1], s2[3]) - max(s2[0], s2[2])
   return logl_0, logl_1


def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:
   s2 = [self.wf.mag[mag_idx + gc] for gc in FT8_GRAY_MAP]

   logl_0 = max(s2[4], s2[5], s2[6], s2[7]) - max(s2[0], s2[1], s2[2], s2[3])
   logl_1 = max(s2[2], s2[3], s2[6], s2[7]) - max(s2[0], s2[1], s2[4], s2[5])
   logl_2 = max(s2[1], s2[3], s2[5], s2[7]) - max(s2[0], s2[2], s2[4], s2[6])
   return logl_0, logl_1, logl_2

Функции ft4_extract_symbol и ft8_extract_symbol принадлежат классу Monitor, инкапсулирующего в себе логику анализа и декодирования данных (данный тезис будет более понятен при чтении статьи от конца к началу). Параметр mag_idx — номер корзины в спектре FFT.

Результат работы функции — кортеж из амплитудных значений бит данных.

Массив Костаса

Массив Костаса (Costas array) — тип решетки точек n*n, в которой ни одна пара точек, не имеет одинакового вектора смещения; иными словами в массиве существует только одна точка в строке и одна точка в столбце и расстояние между любыми двумя парами точек уникально, тем самым, два массива Костаса с одинаковыми значениями после операций параллельного переноса и транспонирования могут быть взаимно симметричны только тогда, когда полностью совпадают расположения точек в обоих массивах.

Исходя из этого, автокорреляционная функция (АКФ) по точкам из массива Костаса стремится к нулю, тем самым минимизируя ложные совпадения и взаимные помехи.

Рисунок 5: Примеры сигналов и графики их АКФ.

На рисунке 5 приведены примеры функций и графики их АКФ; при высоком уровне автокорреляций, огибающая графика имеет плавный рост и убывание на всей длине сигнала (рисунок 5, график A), а при низкой автокорреляции заметного роста не происходит, но, когда сигнал совпадает сам с собой, возникает резкий всплеск, например, в случае с белым шумом (рисунок 5, график D), другими словами при высокой автокорреляции функции градиент плавный, при низкой — резкий.

В протоколах семейства FTX массивы костаса используются в качестве маркеров синхронизации, решая задачу выделения полезных сигналов при наличии интерференций и шума.

Принцип применения массива Костаса можно рассмотреть на примере.

Допустим, имеется диагональная матрица из светодиодов и непрозрачная маска с отверстиями, которые расположены на тех же местах, что и светодиоды (рисунок 6), таким образом, перемещая маску по вертикали и по горизонтали можно добиться полного совпадения отверстий в маске со светодиодами (рисунок 7.2); однако, если один из светодиодов будет погашен (тем самым имитируя частичную потерю или искажение данных), то при такой конфигурации маски и светодиодов возникает неопределенность, когда невозможно точно определить находится ли найденный массив в начале или конце маски (рисунок 7.1).

Рисунок 6: Маска с отверстиями и диодная матрица.

Рисунок 7: 1: Состояние неопределенности; 2: Поиск диодов по маске.

Конфигурация маски и светодиодной матрицы с расположением отверстий и светодиодов согласно массиву Костаса 3*3 решает проблему неопределенности (рисунок 8), таким образом, если один из светодиодов будет выключен, то оставшихся двух будет достаточно для выравнивания маски, избегая состояния неопределенности.

Рисунок 8: маска и светодиодная матрица сконфигурированные по массиву Костаса.

Использование массива Костаса также обеспечивает точность позиционирования, если данные скомпрометированы, например один из светодиодов будет расположен значительно выше или ниже ожидаемого положения.

При использовании массива Костаса 3*3 состояние неопределенности наступит вновь, когда будут отключены два или более светодиодов.

Исходя из описанных свойств и обеспечения точности синхронизации сигналов во времени и частоте, массивы Костаса нашли применение в гидро и радиолокации.

Внедрение массива Костаса в сигнал

В протоколе FT8 используется массив Костаса 7*7, со значениями [3, 1, 4, 0, 6, 5, 2] и записывается в начало, середину и конец сообщения.

В отличие от FT8, протокол FT4 использует уже 4 массива Костаса 4*4 со значениями: [[0, 1, 3, 2], [1, 0, 2, 3], [2, 3, 1, 0], [3, 2, 0, 1]].

Примечание: причина выбора массива Костаса со значениями [3, 1, 4, 0, 6, 5, 2] до конца не понятна, но возможно это отсылка, по причине схожести первых трех чисел из числа пи.

FT8_NN = 79
FT4_NN = 105


FT8_COSTAS_PATTERN = [3, 1, 4, 0, 6, 5, 2]
FT4_COSTAS_PATTERN = [
   [0, 1, 3, 2],
   [1, 0, 2, 3],
   [2, 3, 1, 0],
   [3, 2, 0, 1]
]


FT4_XOR_SEQUENCE = [
   0x4A,  # 01001010
   0x5E,  # 01011110
   0x89,  # 10001001
   0xB4,  # 10110100
   0xB0,  # 10110000
   0x8A,  # 10001010
   0x79,  # 01111001
   0x55,  # 01010101
   0xBE,  # 10111110
   0x28,  # 00101 [000]
]


def ft8_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:
   a91 = ftx_add_crc(payload)
   codeword = ftx_encode(a91)

   mask = 0x80
   i_byte = 0
   for i_tone in range(FT8_NN):
       if 7 > i_tone >= 0:
           yield FT8_COSTAS_PATTERN[i_tone]
       elif 43 > i_tone >= 36:
           yield FT8_COSTAS_PATTERN[i_tone - 36]
       elif 79 > i_tone >= 72:
           yield FT8_COSTAS_PATTERN[i_tone - 72]
       else:
           bits3 = 0
           for bit_or in [4, 2, 1]:
               if codeword[i_byte] & mask:
                   bits3 |= bit_or

               mask >>= 1
               if mask == 0:
                   mask = 0x80
                   i_byte += 1

           yield FT8_GRAY_MAP[bits3]


def ft4_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:
   payload_xor = bytearray(b"\x00" * 10)
   for i in range(10):
       payload_xor[i] = payload[i] ^ FT4_XOR_SEQUENCE[i]

   a91 = ftx_add_crc(payload_xor)
   codeword = ftx_encode(a91)

   mask = 0x80
   i_byte = 0
   for i_tone in range(FT4_NN):
       if i_tone == 0 or i_tone == 104:
           yield 0
       elif 5 > i_tone >= 1:
           yield FT4_COSTAS_PATTERN[0][i_tone - 1]
       elif 38 > i_tone >= 34:
           yield FT4_COSTAS_PATTERN[1][i_tone - 34]
       elif 71 > i_tone >= 67:
           yield FT4_COSTAS_PATTERN[2][i_tone - 67]
       elif 104 > i_tone >= 100:
           yield FT4_COSTAS_PATTERN[3][i_tone - 100]
       else:
           bits2 = 0
           for bit_or in [2, 1]:
               if codeword[i_byte] & mask:
                   bits2 |= bit_or

               mask >>= 1
               if mask == 0:
                   mask = 0x80
                   i_byte += 1

           yield FT4_GRAY_MAP[bits2]

Генераторная функция ft8_encode вызывает ранее рассмотренные функции вычисления контрольных сумм и LDPC кодирования, после чего формирует последовательность по схеме: S7 D29 S7 D29 S7, где S — данные массива Костаса, D — данные сообщения после ранее преобразованные в код Грея. Выходными данными функции являются номера GFSK-тонов.

Пример данных, сформированные функцией ft8_encode: 3140652 00000000100651431071150732373 3140652 35427373324062650244263575260 3140652 (блоки данных и массивы Костаса разделены пробелами).

Работа генераторной функции ft4_encode аналогична работе функции ft8_encode, с той лишь разницей, что кодируемые данные предварительно подвергаются операции XOR в соответствии со схемой FT4_XOR_SEQUENCE, такое повышение энтропии необходимо для того, чтобы свести к минимуму ситуацию, когда в сообщении идет друг за другом большое количество нулей, что может негативно повлиять на помехоустойчивость.

Данные FT4 до и после операции XOR: 00000020587223930748 / 4a5e8994e8f85ac6b960.

Формирование результата осуществляется по схеме: R S4_1 D29 S4_2 D29 S4_3 D29 S4_4 R, где R — ramping symbol, равный 0.

Пример данных, сформированные функцией ft4_encode: 0 0132 10331123303131102330223011332 1023 01332311302112123233233113233 2310 30302303033330213121320010313 3201 0 (блоки данных и массивы Костаса разделены пробелами)

FT8_NN, FT4_NN — общее количество тонов в последовательности сигнала FT8/FT4.

Поиск сигнала по массиву Костаса

В протоколах семейства FTX при детекции сигналов в общей полосе производится обход спектра временного окна и скоринг сигнала по амплитудам частот, расположенных согласно значениям из массива Костаса. При достижении порогового значения скоринга, полоса частот во временном промежутке принимается в расчет для последующего декодирования сигнала.

@dataclass
class Candidate:
   time_offset: int
   freq_offset: int
   time_sub: int
   freq_sub: int
   score: int = 0

Класс Candidate для хранения информации о положении сигнала и его скор-балла в общем сигнале.

time_offset — индекс блока времени;

freq_offset — индекс блока частот;

time_sub — подиндекс блока времени при оверсемплинге;

freq_sub — подиндекс блока частот при оверсемплинге.

def get_cand_mag_idx(self, candidate: Candidate) -> int:
   wf = self.wf

   offset = candidate.time_offset
   offset = offset * wf.time_osr + candidate.time_sub
   offset = offset * wf.freq_osr + candidate.freq_sub
   offset = offset * wf.num_bins + candidate.freq_offset

   return offset


def ft8_sync_score(self, candidate: Candidate) -> int:
   wf = self.wf

   score = 0
   num_average = 0

   mag_cand = self.get_cand_mag_idx(candidate)

   for m in range(FT8_NUM_SYNC):
       for k in range(FT8_LENGTH_SYNC):
           block = FT8_SYNC_OFFSET * m + k
           block_abs = candidate.time_offset + block
           if block_abs < 0:
               continue
           if block_abs >= wf.num_blocks:
               break

           p8 = mag_cand + block * wf.block_stride
           sm = FT8_COSTAS_PATTERN[k]
           p8sm = p8 + sm
           if sm > 0:
               score += wf.mag[p8sm] - wf.mag[p8sm - 1]
               num_average += 1
           if sm < 7:
               score += wf.mag[p8sm] - wf.mag[p8sm + 1]
               num_average += 1
           if k > 0 and block_abs > 0:
               score += wf.mag[p8sm] - wf.mag[p8sm - wf.block_stride]
               num_average += 1
           if k + 1 < FT8_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:
               score += wf.mag[p8sm] - wf.mag[p8sm + wf.block_stride]
               num_average += 1

   if num_average > 0:
       score = int(score / num_average)
   return score


def ft4_sync_score(self, candidate: Candidate) -> int:
   wf = self.wf
   score = 0
   num_average = 0

   mag_cand = self.get_cand_mag_idx(candidate)

   for m in range(FT4_NUM_SYNC):
       for k in range(FT4_LENGTH_SYNC):
           block = 1 + (FT4_SYNC_OFFSET * m) + k
           block_abs = candidate.time_offset + block

           if block_abs < 0:
               continue
           if block_abs >= wf.num_blocks:
               break

           p4 = mag_cand + (block * wf.block_stride)
           sm = FT4_COSTAS_PATTERN[m][k]
           p4sm = p4 + sm

           if sm > 0:
               score += wf.mag[p4sm] - wf.mag[p4sm - 1]
               num_average += 1
           if sm < 3:
               score += wf.mag[p4sm] - wf.mag[p4sm + 1]
               num_average += 1
           if k > 0 and block_abs > 0:
               score += wf.mag[p4sm] - wf.mag[p4sm - wf.block_stride]
               num_average += 1
           if k + 1 < FT4_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:
               score += wf.mag[p4sm] - wf.mag[p4sm + wf.block_stride]
               num_average += 1

   if num_average > 0:
       score = int(score / num_average)
   return score


def ftx_sync_score(self, candidate: Candidate) -> int:
   wf = self.wf

   if wf.protocol == FTX_PROTOCOL_FT4:
       sync_fun = self.ft4_sync_score
   elif wf.protocol == FTX_PROTOCOL_FT8:
       sync_fun = self.ft8_sync_score
   else:
       raise ValueError("Invalid protocol")

   return sync_fun(candidate)


def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:
   wf = self.wf

   num_tones = FTX_TONES_COUNT[wf.protocol]
   time_offset_range = range(-FTX_LENGTH_SYNC[wf.protocol], int(FTX_TIME_RANGE[wf.protocol]))

   heap = []
   can = Candidate(0, 0, 0, 0)
   for time_sub in range(wf.time_osr):
       for freq_sub in range(wf.freq_osr):
           for time_offset in time_offset_range:
               for freq_offset in range(wf.num_bins - num_tones):
                   can.time_sub = time_sub
                   can.freq_sub = freq_sub
                   can.time_offset = time_offset
                   can.freq_offset = freq_offset

                   if (score := self.ftx_sync_score(can)) < min_score:
                       continue

                   candidate = copy(can)
                   candidate.score = score

                   heap.insert(0, candidate)

   heap.sort(key=lambda x: x.score, reverse=True)
   return heap[:num_candidates]

Вспомогательная функция get_cand_mag_idx переводит параметры частоты и времени кандидата в позицию в матрице амплитуд сигнала.

Функция ftx_find_candidates осуществляет подбор и скоринг кандидатов, перебирая частоты и временные временные отметки, в которых потенциально может быть обнаружен сигнал. Кандидат передается в функцию ftx_sync_score для анализа, если удается обнаружить признаки сигналов по массиву Костаса, то значение скор-балла увеличивается.

Результатом работы функции является список кандидатов (heap), отсортированный по признаку score, исключая кандидатов, чей скор-балл ниже значения в min_score; размер списка ограничен размером num_candidates.

Функция ftx_sync_score, в зависимости от протокола, FT8 или FT4, передает управление функциям ft4_sync_score и ft8_sync_score, которые в свою очередь осуществляют анализ сигнала на наличие маркеров-массивов Костаса.

Функции ft8_sync_score и ft4_sync_score реализуют обход участка частот кандидата, согласно расположениям массивов Костаса (позиции 0-7, 36-43, 72-79 для FT8 и 1-4, 34-37, 67-70, 100-103 для FT4). В значении score суммируется разность амплитуд между искомыми и соседними частотами, иными словами, чем резче градиент амплитуд в полосе, тем выше скор-балл кандидата. Переменная wf.mag (ссылка на self.wf) — матрица амплитуд (WFFT) принятого сигнала.

Канальный уровень

На канальном уровне происходит синтезирование сигнала, на основе данных, сформированным на предыдущем уровне. В результате формируется дискретный сигнал, готовый для передачи по аналоговому (звуковому) каналу связи.

При приеме сигнала, на этом уровне осуществляется оконное преобразование Фурье и вычисление амплитуд.

FSK и GFSK формирование сигнала

FSK (Frequency Shift Keying) — вид модуляции (манипуляции), при котором информация кодируется изменением несущей частоты сигнала. Для кодирования информации может использоваться две или более частот, где каждому состоянию соответствует та или иная частота.

На рисунке 9 изображено представление сигнала с использованием FSK для бинарных данных, в частности единицам соответствует высокая частота, а нулям меньшая.

Рисунок 9: FSK для двоичных данных и вид сигнала.

Стоит обратить внимание на то, что в рассматриваемом примере фаза непрерывна, то есть не имеет скачков.

GFSK (Gaussian Frequency-Shift Keying) — подвид FSK манипуляции при которой используется фильтр Гаусса (рисунок 10) для сглаживания частотных перестроек.

Рисунок 10: Импульсная переходная функция фильтра Гаусса.

Принцип модуляции GFSK идентичен обычному FSK, с той разницей, что формируемые импульсы тонов предварительно проходят через фильтр Гаусса для сглаживания (плавно перестраивает фазу сигнала в начале и конце тона), что позволяет уменьшить ширину спектра сигнала, так как резкая перестройка частоты на спектрограмме выглядит как широкополосный выброс (рисунок 13).

Рисунок 11: Сравнение FSK и GFSK.

На рисунках 11 и 12 показано сравнение сигналов и спектров обычного FSK и того же сигнала с использовании фильтра Гаусса (GFSK).

Фильтр Гаусса описывается уравнением 1.

(1)

Где

, а BT — коэффициент сглаживания,чем он больше, тем меньше сглаживание исходного импульса (рисунок 12).

Рисунок 12: Сравнение сглаживаний при изменении параметра BT.

Рисунок 13: Сравнение спектров FSK и GFSK сигналов.

В протокол FT8 используется ширина сглаживания BT равная 2.0, в протоколе FT4 — 1.0.

import math
import typing


FT8_SYMBOL_PERIOD = 0.160
FT4_SYMBOL_PERIOD = 0.048


FT8_SYMBOL_BT = 2.0
FT4_SYMBOL_BT = 1.0


GFSK_K = math.pi * math.sqrt(2 / math.log(2))


def gfsk_pulse(n_spsym: int, symbol_bt: float) -> typing.Generator[float, None, None]:
   for i in range(3 * n_spsym):
       t = i / n_spsym - 1.5
       arg1 = GFSK_K * symbol_bt * (t + 0.5)
       arg2 = GFSK_K * symbol_bt * (t - 0.5)
       val = (math.erf(arg1) - math.erf(arg2)) / 2

       yield val


def synth_gfsk(symbols: typing.List[int], n_sym: int,
              f0: float,
              symbol_bt: float, symbol_period: float,
              signal_rate: int) -> typing.Generator[float, None, None]:
   n_spsym = int(0.5 + signal_rate * symbol_period)
   n_wave = n_sym * n_spsym
   hmod = 1.0

   dphi_peak = 2 * math.pi * hmod / n_spsym
   dphi = [2 * math.pi * f0 / signal_rate] * (n_wave + 2 * n_spsym)

   pulse = list(gfsk_pulse(n_spsym, symbol_bt))

   for i in range(n_sym):
       ib = i * n_spsym
       for j in range(3 * n_spsym):
           dphi[j + ib] += dphi_peak * symbols[i] * pulse[j]

   for j in range(2 * n_spsym):
       dphi[j] += dphi_peak * pulse[j + n_spsym] * symbols[0]
       dphi[j + n_sym * n_spsym] += dphi_peak * pulse[j] * symbols[n_sym - 1]

   phi = 0.0
   n_ramp = n_spsym // 8
   for k in range(n_wave):
       val = math.sin(phi)
       phi = math.fmod(phi + dphi[k + n_spsym], 2 * math.pi)

       if k < n_ramp or k >= n_wave - n_ramp:
           i_ramp = (k if k < n_ramp else n_wave - k - 1)
           env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2
           val *= env

       yield val

Генераторная функция gfsk_pulse рассчитывает значения сглаживающего импульса.

Параметр n_spsym — количество семплов на символ; параметр symbol_bt — крутизна сглаживания (параметр BT); результат — генератор со значениями типа float.

Генераторная функция synth_gfsk формирует дискретный сигнал, с частотой дискретизации задаваемой параметром signal_rate, на вход передается список тонов в параметре symbols; параметр n_sym определяет размер множества символов (значение FT4_NN/FT8_NN); параметр f0 определяет базовую частоту, относительно которой будет будут рассчитываться частоты тонов; symbol_bt — крутизна фильтра Гаусса (FT4_SYMBOL_BT/FT8_SYMBOL_BT); параметр symbol_period определяет длительность звучания каждого тона в секундах (FT4_SYMBOL_PERIOD/FT8_SYMBOL_PERIOD); функция генерирует последовательность дискретных значений от -1 до 1 используя функцию синус (sin), в качестве аргумента которого передается рассчитанная фаза сигнала.

pulse = list(gfsk_pulse(n_spsym, symbol_bt))

В pulse формируется список дискретных значений сглаживающей функции Гаусса.

При формировании сигнала дополнительно происходит сглаживание амплитуд в начале и конце сигнала оконной функцией Ханна:

if k < n_ramp or k >= n_wave - n_ramp:
   i_ramp = (k if k < n_ramp else n_wave - k - 1)
   env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2
   val *= env

Таким образом, результатом работы функций канального уровня, является дискретный звуковой сигнал, готовый к отправке в ЦАП передатчика.

Общий вид кодера и генератора сигнала FT8/FT4:

import numpy as np
from scipy.io.wavfile import write


FT4_SLOT_TIME = 7.5
FT8_SLOT_TIME = 15.0


def main():
   try:
       call = "CQ R1ABC KO85"
       payload = ftx_message_encode_std(*call.split())
   except Exception as e:
       print(f"Cannot parse message: {type(e)}")
       return

   print("Payload", ''.join('{:02x}'.format(x) for x in payload))

   is_ft4 = False
   if is_ft4:
       tones = ft4_encode(payload)
   else:
       tones = ft8_encode(payload)

   tones = list(tones)

   print("FSK tones:", "".join(str(i) for i in tones))

   frequency = 1000

   symbol_period = FT4_SYMBOL_PERIOD if is_ft4 else FT8_SYMBOL_PERIOD
   symbol_bt = FT4_SYMBOL_BT if is_ft4 else FT8_SYMBOL_BT

   sample_rate = 12000
   num_tones = FT4_NN if is_ft4 else FT8_NN
   slot_time = FT4_SLOT_TIME if is_ft4 else FT8_SLOT_TIME

   num_samples = int(0.5 + num_tones * symbol_period * sample_rate)
   num_silence = int((slot_time * sample_rate - num_samples) / 2)

   signal = np.fromiter(synth_gfsk(tones, num_tones, frequency, symbol_bt, symbol_period, sample_rate), dtype=float)
   silence = np.zeros(num_silence)
   amplitude = np.iinfo(np.int16).max
   data = np.concat([silence, amplitude * signal, silence])
   write("examples/signal.wav", sample_rate, data.astype(np.int16))


if __name__ == '__main__':
   main()

Приведенный код формирует из строки «CQ R1ABC KO85» осуществляет генерацию сигнала протокола FT8 и сохраняет сигнал в файл с именем signal.wav.

Изменение значения переменной is_ft4 на True изменит логику генератора на формирование сигнала протокола FT4.

Результат вывода данных для FT8:

Payload 00000020587223930748
FSK tones: 3140652000000001006514310711507323733140652354273733240626502442635752603140652

Результат вывода данных для FT4:

Payload 00000020587223930748
FSK tones: 001321033112330313110233022301133210230133231130211212323323311323323103030230303333021312132001031332010

Декодирование сигнала

При декодировании входящий сигнал разделяется на блоки, размером в период символа протокола (для FT8 0.160 и FT4 0.048 сек), суммарной продолжительностью во временной слот протокола (15.0 и 7.5 секунд для FT8 и FT4 соответственно); далее каждый блок передается в оконное преобразование Фурье (WFFT, рисунок 14) с использованием оконной функции (например Ханна, рисунок 15 и формула 2), магнитуды частот преобразуются в амплитуды в децибелах.

Рисунок 14: Представление преобразования Фурье.

Рисунок 15: График оконной функции Ханна.

(2)

Дальнейшая обработка сигнала заключается в поиске сигналов-кандидатов (поиск по массиву Костаса), нормализация амплитуд сигнала кандидата и передача в LDPC-декодер (алгоритм Belief Propagation), то есть вверх по иерархии уровней абстрагирования.

Логику обработки сигнала можно инкапсулировать в класс Monitor:

kMin_score = 5
kMax_candidates = 140
kLDPC_iterations = 25
kMaxLDPCErrors = 32


@dataclass
class Waterfall:
   num_bins: int
   time_osr: int
   freq_osr: int
   protocol: int
   mag = typing.List[int]
   max_blocks: int

   num_blocks: int = 0
   block_stride: int = 0

   def __post_init__(self):
       self.block_stride = (self.time_osr * self.freq_osr * self.num_bins)
       self.mag = [0] * (self.max_blocks * self.time_osr * self.freq_osr * self.num_bins)


class Monitor:
   @staticmethod
   def hann_i(i: int, N: int) -> float:
       x = math.sin(math.pi * i / N)
       return x ** 2


   @staticmethod
   def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:
       ...


   def __init__(self, f_min: int, f_max: int, sample_rate: int, time_osr: int, freq_osr: int, protocol):
       slot_time = FTX_SLOT_TIMES[protocol]
       symbol_period = FTX_SYMBOL_PERIODS[protocol]

       self.block_size = int(sample_rate * symbol_period)
       self.subblock_size = int(self.block_size / time_osr)
       self.nfft = self.block_size * freq_osr
       self.fft_norm = 2.0 / self.nfft

       self.window = [self.fft_norm * self.hann_i(i, self.nfft) for i in range(self.nfft)]
       self.last_frame = [0.0] * self.nfft

       max_blocks = int(slot_time / symbol_period)

       self.min_bin = int(f_min * symbol_period)
       self.max_bin = int(f_max * symbol_period + 1)
       num_bins = self.max_bin - self.min_bin

       self.wf = Waterfall(max_blocks=max_blocks, num_bins=num_bins, time_osr=time_osr, freq_osr=freq_osr,
                           protocol=protocol)

       self.symbol_period = symbol_period

       self.max_mag = -120.0


   def monitor_process(self, frame: typing.List[float]):
       if self.wf.num_blocks >= self.wf.max_blocks:
           return False

       offset = self.wf.num_blocks * self.wf.block_stride
       frame_pos = 0

       for time_sub in range(self.wf.time_osr):
           for pos in range(self.nfft - self.subblock_size):
               self.last_frame[pos] = self.last_frame[pos + self.subblock_size]

           for pos in range(self.nfft - self.subblock_size, self.nfft):
               self.last_frame[pos] = frame[frame_pos]
               frame_pos += 1

           timedata = [self.window[pos] * self.last_frame[pos] for pos in range(self.nfft)]
           freqdata = np.fft.fft(timedata)[:self.nfft // 2 + 1]

           for freq_sub in range(self.wf.freq_osr):
               for bin in range(self.min_bin, self.max_bin):
                   src_bin = (bin * self.wf.freq_osr) + freq_sub
                   mag2 = freqdata[src_bin].imag ** 2 + freqdata[src_bin].real ** 2
                   db = 10.0 * math.log10(1E-12 + mag2)

                   scaled = int(2 * db + 240)
                   self.wf.mag[offset] = max(min(scaled, 255), 0)
                   offset += 1

                   self.max_mag = max(self.max_mag, db)

       self.wf.num_blocks += 1
       return True


   def ft8_sync_score(self, candidate: Candidate) -> int:
       ...


   def ft4_sync_score(self, candidate: Candidate) -> int:
       ...


   def ftx_sync_score(self, candidate: Candidate) -> int:
       ...


   def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:
       ...


   def get_cand_mag_idx(self, candidate: Candidate) -> int:
       ...


   def ft4_extract_likelihood(self, cand: Candidate) -> typing.List[float]:
       log174 = [0.0] * FTX_LDPC_N

       mag = self.get_cand_mag_idx(cand)
       for k in range(FT4_ND):
           sym_idx = k + (5 if k < 29 else 9 if k < 58 else 13)
           bit_idx = 2 * k

           block = cand.time_offset + sym_idx
           if block < 0 or block >= self.wf.num_blocks:
               log174[bit_idx + 0] = 0
               log174[bit_idx + 1] = 0
           else:
               logl_0, logl_1 = self.ft4_extract_symbol(mag + sym_idx * self.wf.block_stride)
               log174[bit_idx + 0] = logl_0
               log174[bit_idx + 1] = logl_1

       return log174


   def ft8_extract_likelihood(self, cand: Candidate) -> typing.List[float]:
       log174 = [0.0] * FTX_LDPC_N

       mag = self.get_cand_mag_idx(cand)
       for k in range(FT8_ND):
           sym_idx = k + (7 if k < 29 else 14)
           bit_idx = 3 * k

           block = cand.time_offset + sym_idx
           if block < 0 or block >= self.wf.num_blocks:
               log174[bit_idx + 0] = 0
               log174[bit_idx + 1] = 0
               log174[bit_idx + 2] = 0
           else:
               logl_0, logl_1, logl_2 = self.ft8_extract_symbol(mag + sym_idx * self.wf.block_stride)
               log174[bit_idx + 0] = logl_0
               log174[bit_idx + 1] = logl_1
               log174[bit_idx + 2] = logl_2

       return log174


   def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:
    ...


   def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:
    ...


   def ftx_decode_candidate(
           self, cand: Candidate,
           max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.Optional[bytes], float]]:
       wf = self.wf

       if wf.protocol == FTX_PROTOCOL_FT4:
           log174 = self.ft4_extract_likelihood(cand)
       else:
           log174 = self.ft8_extract_likelihood(cand)

       log174 = list(self.ftx_normalize_logl(log174))
       ldpc_errors, plain174 = bp_decode(log174, max_iterations)

       if ldpc_errors > kMaxLDPCErrors:
           return None
       if not ftx_check_crc(plain174):
           return None

       a91 = self.pack_bits(plain174, FTX_LDPC_K)
       crc_extracted = ftx_extract_crc(a91)

       if wf.protocol == FTX_PROTOCOL_FT4:
           payload = bytearray(a91[i] ^ xor for i, xor in enumerate(FT4_XOR_SEQUENCE))
           tones = ft4_encode(payload)
       else:
           payload = a91
           tones = ft8_encode(payload)

       snr = self.ftx_subtract(cand, tones)
       return DecodeStatus(ldpc_errors, crc_extracted), payload, snr


   def decode(self) -> typing.Generator[typing.Tuple[float, float, float, str], None, None]:
       hashes = set()
       wf = self.wf

       candidate_list = self.ftx_find_candidates(kMax_candidates, kMin_score)
       for cand in candidate_list:
           freq_hz = (self.min_bin + cand.freq_offset + cand.freq_sub / wf.freq_osr) / self.symbol_period
           time_sec = (cand.time_offset + cand.time_sub / wf.time_osr) * self.symbol_period - 0.65

           if not (x := self.ftx_decode_candidate(cand, kLDPC_iterations)):
               continue

           status, message, snr = x

           if (crc := status.crc_extracted) in hashes:
               continue

           hashes.add(crc)

           call_to_rx, call_de_rx, extra_rx = ftx_message_decode(message)

           yield snr, time_sec, freq_hz, " ".join([call_to_rx, call_de_rx or "", extra_rx or ""])

Генераторная функция decode инкапсулирует логику подбора кандидатов и их декодирование ранее описанными функциями.

Функция выдает кортеж с данными о SNR, отклонении времени и декодированные сообщения в текстовом виде.

Функция ftx_decode_candidate принимает на вход сигнал-кандидат, вызывает методы ft4_extract_likelihood/ft8_extract_likelihood; данные извлеченные из сигнала, нормализованные функцией ftx_normalize_logl, передаются в LPDC-декодер (bp_decode), после чего проверяется CRC-14 (ftx_check_crc) и на основе декодированных данных декодируется сообщение.

Функции ft8_extract_likelihood и ft4_extract_likelihood извлекают биты данны из общего скопа сигналов, формируя список «аналоговых» бит.

Функция monitor_process реализует аккумулятор сигнала, принимая на вход блоки принятого дискретного сигнала, производя над ними преобразование Фурье и сохраняет результаты в «водопад» (структура Waterfall).

Параметрами конструктора класса Monitor являются ширина полосы частот принимаемого сигнала (f_min, f_max), частота дискретизации (sample_rate), параметры оверсемплинга (time_osr, freq_osr) и идентификатор протокола FT8/FT4.

Полный пример декодера с использованием класса Monitor:

import time
import numpy as np
from scipy.io.wavfile import read
from consts import FTX_PROTOCOL_FT8, FTX_PROTOCOL_FT4
from decode import Monitor


kFreq_osr = 2
kTime_osr = 4


def main():
   is_ft4 = False

   sample_rate, data = read("examples/signal.wav")

   amplitude = np.iinfo(data.dtype).max
   signal = data / amplitude

   protocol = FTX_PROTOCOL_FT4 if is_ft4 else FTX_PROTOCOL_FT8

   mon = Monitor(
       f_min=200,
       f_max=3000,
       sample_rate=sample_rate,
       time_osr=kTime_osr,
       freq_osr=kFreq_osr,
       protocol=protocol
   )

   frame_pos = 0
   while True:
       eof = frame_pos >= len(signal) - mon.block_size
       if eof or not mon.monitor_process(signal[frame_pos:frame_pos + mon.block_size]):
           print(f"Waterfall accumulated {mon.wf.num_blocks} symbols")
           print(f"Max magnitude: {mon.max_mag:+.2f} dB")

           tm_slot_start = 0
           ts1 = time.monotonic()
           for i, (snr, time_sec, freq_hz, text) in enumerate(mon.decode(tm_slot_start)):
               print(
                   f"{i + 1:03}\t"
                   f"{snr:+06.2f}dB\t"
                   f"{time_sec:-.2f}sec\t"
                   f"{freq_hz:.2f}Hz\t"
                   f"{text}"
               )

           mon.wf.num_blocks = 0
           mon.max_mag = -120.0

           ts2 = time.monotonic()
           print("-" * 20, "decoded @", ts2 - ts1, "sec")

       if eof:
           break

       frame_pos += mon.block_size


if __name__ == '__main__':
   main()

На рисунках 16 и 17 приведены спектрограмма формируемых сигналов для FT8 и FT4.

Рисунок 16: Спектрограмма сигнала FT8.

Рисунок 17: Спектрограмма сигнала FT4.

Расчет SNR

Один из способов вычисления SNR — определить амплитуды сигнала, исключить из полосы сигнала сам сигнал и просуммировать амплитуды в его окрестностях.

from copy import copy
from itertools import cycle


def ftx_subtract(self, candidate: Candidate, tones: typing.Iterable[int]) -> float:
   num_tones = FTX_TONES_COUNT[self.wf.protocol]

   can = copy(candidate)
   snr_all = 0.0

   tones = cycle(tones)
   for freq_sub in range(self.wf.freq_osr):
       can.freq_sub = freq_sub

       mag_cand = self.get_cand_mag_idx(can)
       noise = 0.0
       signal = 0.0
       num_average = 0

       for i, tone in enumerate(tones):
           block_abs = candidate.time_offset + i
           if block_abs < 0:
               continue

           if block_abs >= self.wf.num_blocks:
               break

           wf_el = mag_cand + i * self.wf.block_stride

           noise_val = 100000.0
           for s in filter(lambda x: x != tone, range(num_tones)):
               noise_val = min(noise_val, self.wf.mag[wf_el + s] * 0.5 - 120.0)

           noise += noise_val
           signal += self.wf.mag[wf_el + tone] * 0.5 - 120.0
           num_average += 1

       noise /= num_average
       signal /= num_average
       snr = signal - noise

       for i, tone in enumerate(tones):
           block_abs = candidate.time_offset + i
           if block_abs < 0:
               continue
           if block_abs >= self.wf.num_blocks:
               break
           wf_el = mag_cand + i * self.wf.block_stride
           self.wf.mag[wf_el + tone] -= snr * 2 + 240

       snr_all += snr

   return snr_all / self.wf.freq_osr / 2 - 22

Функция ftx_subtract (метод класса Monitor) перебирает тона исходного сигнала, вычисляет их амплитуды и амплитуды полосы сигнала, исключая амплитуды самих тонов. Функция возвращает результат деления уровня сигнала на уровень шумов. Побочным эффектом функции является удаление сигнала из водопада, что влияет на чувствительность декодирования соседних слабых сигналов.

Заключение

В статье были рассмотрены протоколы FT8 и FT4, их внутреннее устройство и принципы работы. Так, используя относительно простые и давно известные методы кодирования, реализуется достаточно помехоустойчивый протокол, позволяющий устанавливать дальние радиолюбительские связи. Также протоколы представляют интерес тем, что воплощают в себе принципы из таких областей как линейное кодирование и коррекция ошибок, радиолокация и цифровая обработка сигналов, частотная манипуляция, что также может являться предметом углубленного изучения.

Один из недостатков протоколов, подобных FT8/FT4, является тот факт, что радиообмен осуществляется в автоматическом режиме (в режиме модема), а полезными данными является только обмен метаданными в виде позывных, геолокации и уровнем SNR, хотя при этом протокол поддерживает обмен произвольными сообщениями очень малой длины и учитывая скорость обмена, работа в режиме чата достаточно затруднительная.

Так как архитектура протоколов представляет собой «матрешку» или «луковицу», технология может быть использована как базис для радиолюбительских экспериментов с передачей информации на большие расстояния.

Ссылки

  1. Исходный код реализации FT8/FT4 на Python

  2. Исходный код примера генератора сигнала FTX

  3. Исходный код примера декодера FTX

  4. Исходный код ft8_lib на C

  5. Статья с описанием протокола из журнала QEX

  6. Описание механизма синхронизации сигналов в FT8

  7. Программа WSJT, реализующая протоколы FTX

Комментарии (10)


  1. vesowoma
    15.07.2025 17:54

    Разработчики протокола преследовали цель обеспечить связь при очень низком уровне сигнала, в том числе ниже порога слышимости человека;

    Не понял этого момента. Радиолюбители обычно используют диапазоны сильно выше 20 кГц, что могут слышать люди, и в приемниках происходит преобразование передаваемого сигнала в аудиосигнал или визуальную информацию (как например SSTV)


    1. Vindicar
      15.07.2025 17:54

      Я полагаю, имеется ввиду, что данные, переданные по этому протоколу можно восстановить даже при таком соотношении сигнал-шум, при котором простая речь уже будет неразличима.


      1. bashkirtsevich Автор
        15.07.2025 17:54

        Всё верно. В контрпример еще приводят телеграф, его человек еще может различить там, где неразличима речь; протоколы FT8/FT4 обрабатываются программно, что позволяет превзойти чувствительность человека.


    1. bashkirtsevich Автор
      15.07.2025 17:54

      Здесь имеется ввиду порог слышимости в децибелах, когда сигнал настолько слабый, что человеческое ухо не может отличить его от фонового (эфирного) шума. Сами сигналы FT8/FT4 передаются на слышимых человеком частотах, то есть можно настроить приемник, например, на 14.074 МГц USB и услышать их.


      1. vesowoma
        15.07.2025 17:54

        Спасибо, теперь понятно. Просто термин "порог слышимости" для протокола, где обработка ведется не ушами и мозгом, а при помощи технических средств, ввел в заблуждение.


      1. Astroscope
        15.07.2025 17:54

        можно настроить приемник, например, на 14.074 МГц USB и услышать их

        А если прислонить к динамику приемника микрофон телефона с программой-декодером, то еще и увидеть, что именно передается. Для не-радиолюбителей этот обмен едва ли интересен, потому что, как вполне очевидно описано в статье, передаваемая информация состоит из позывных, примерных координат (локаторов), отчетов о силе (скорее об SNR) принимаемых сигналов, и из служебных флагов подтверждения обмена - традиционных для связистов "RRR" (собственно подтверждение) и "73" ("наилучшие пожелания", почти всегда передается в конце сеанса связи и поэтому дилетантами ошибочно понимается как "до свидания").


  1. NutsUnderline
    15.07.2025 17:54

    сразу захотелось посмотреть какая кодовая база для всего этого была в оригинале


    1. CitizenOfDreams
      15.07.2025 17:54

      Подозреваю, что Фортран. Джо Тейлор - это ученый старой школы.


  1. ra4hgn
    15.07.2025 17:54

    Полезные моды, но я предпочитаю SSB/CW


  1. RusAlex
    15.07.2025 17:54

    Какая скорость передачи у второго протокола ?