
FT8 — цифровой радиолюбительский протокол, разработанный Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году. В этой статье будут рассмотрены подробности работы протокола.
Статья может быть интересна радиолюбителям, как знакомым, так и не знакомым с протоколами FT8 и FT4, а также тем, кто хочет в подробностях понять устройство этих протоколов.
Введение
Протокол FT8 был разработан радиолюбителями Джо Тейлором (K1JT) и Стивом Франке (K9AN) в 2017 году, чьи инициалы были увековечены в названии протокола.
Разработчики протокола преследовали цель обеспечить связь при очень низком уровне сигнала, в том числе ниже порога слышимости человека; в плохих условиях прохождения радиосигнала; при использовании передатчиков низкой мощности; с использованием узких полос частот для приема и передачи сигнала.
Для обеспечения связи при вышеописанных условиях, формируемый сигнал сильно растянут во времени (порядка 15 секунд на передачу) и, как следствие, имеет низкий битрейт. Исходя из этого, протокол был спроектирован для работы в тайм-слотах (временные интервалы), поэтому работа протокола должна быть синхронизирована с точностью до секунд с мировым временем.
FT4 был разработан в 2019 году и является более быстрой версией протокола FT8. Основные цели разработки протокола: увеличения количества QSO за счёт уменьшения времени передачи одного сообщения; проведения радиосвязей в условиях соревнований.
Из-за повышенной скорости протокол имеет более широкую полосу частот.
Протоколы используют GFSK (Gaussian Frequency Shift Keying) — частотную манипуляцию с использованием фильтра Гаусса. Цифры в названии протокола определяют количество тонов, используемых при FSK.
Общие технические характеристики
FT8
Цикл передачи 15 секунд;
Скорость передачи 6,09 бит/с;
Ширина полосы 50 Гц;
Размер сообщения 77 бит + 12-бит CRC-14;
Механизм коррекции ошибок LDPC (174,87);
Модуляция 8-GFSK с шагом 6.25Гц;
Порог декодирования сигналов до -24…-26 дБ относительно шума.
FT4
Цикл передачи 7.5 секунд;
Ширина полосы 90 Гц;
Размер сообщения 77 бит + 12-бит CRC-14;
Механизм коррекции ошибок LDPC (174,87);
Модуляция 4-GFSK с шагом 23.4Гц;
Порог декодирования сигналов до -24…-26 дБ относительно шума.
Структура протокола
Как и любой протокол передачи данных, FT8 и FT4 представляю собой «матрешку», в которой данные одного уровня преобразуют данные для передачи уровнем ниже.
Общая схема протокола приведена на рисунке 1. Разделение на уровни модели OSI условное.

Рисунок 1: Общая схема протокола (абстракция по уровням OSI условная).
На рисунке 1.А приведена схема взаимодействия алгоритмов кодирования при формировании исходящего сигнала; на рисунке 1.B — схема обработки и декодирования принятого сигнала.
Протокол прикладного уровня
При проведении связей в FT8/FT4 участники разделяются на принимающих и передающих.
Так как протокол привязан к временным интервалам, прием и передача осуществляются в четные и нечетные четверти (⅛ в случае FT4) минут, передающие корреспонденты излучают сигнал, в то время как в этот временной промежуток осуществляется прием и накопление сигнала на стороне слушающих; в следующем временном промежутке происходит смена ролей, принимавшие сигнал передают ответ (если посчитают нужным), а передавшие свой сигнал становятся слушателями и т.д. цикл повторяется. Модульная привязка к секундам необходима для избежания ситуации, когда корреспонденты начинают передавать в произвольном порядке и расшифровка сигнала становится невозможной.
Обмен данными сильно похож на обычный любительский радиообмен, когда часть корреспондентов дает общий вызов, а остальные слушают и принимают решение о попытке проведения связи с этим участником.
Сценарий обычного сеанса связи:
Временной отрезок |
Данные |
Комментарий |
00:00-00:15 |
CQ R3AAA KO85 |
Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи |
00:15-00:30 |
R3AAA R9FXX LO87 |
Отвечает корреспондент из Перми (локатор LO87) с позывным R9FXX |
00:30-00:45 |
R9FXX R3AAA -1 |
R3AAA дает рапорт для R9FXX в -1 dB по SNR |
00:45-01:00 |
R3AAA R9FXX R-4 |
R9FXX дает рапорт для R3AAA в -4 dB по SNR |
01:00-01:15 |
R9FXX R3AAA RR73 |
R3AAA подтверждает прием метаданных и прощается |
01:15-01:30 |
R3AAA R9FXX 73 |
R9FXX взаимно отправляет 73 |
Общее время проведения связи наняло 1 минуту 30 секунд.
Сценарий короткого сеанса связи:
Временной отрезок |
Данные |
Комментарий |
00:00-00:15 |
CQ R3AAA KO85 |
Корреспондент из Москвы (локатор KO85), с позывным R3AAA приглашает для установления связи |
00:15-00:30 |
R3AAA R9FXX R-4 |
R9FXX дает рапорт для R3AAA в -4 dB по SNR |
00:30-00:45 |
R9FXX R3AAA R-1 |
R3AAA дает рапорт для R9FXX в -1 dB по SNR |
00:45-01:00 |
R3AAA R9FXX RR73 |
R9FXX подтверждает прием метаданных и прощается |
01:00-01:15 |
R9FXX R3AAA 73 |
R3AAA взаимно отправляет 73 |
Сокращенные сеансы применяются в случаях когда у дальнего корреспондента большая очередь желающих провести связь.
В случаях когда кто-то из участников не может с первого раза получить ответ, сообщение может быть отослано повторно и сеанс связи потребует большего времени.
Пример сеанса радиосвязи R9FEU с R1CDY:
Время UTC |
SNR |
Δt |
Частота |
Данные |
141945 |
Tx |
1000 |
CQ R9FEU LO87 |
|
142000 |
-3 |
0.1 |
1001 |
R9FEU R1CDY KP40 |
142015 |
Tx |
1000 |
R1CDY R9FEU -03 |
|
142030 |
1 |
0.1 |
1001 |
R9FEU R1CDY R-17 |
142045 |
Tx |
1000 |
R1CDY R9FEU RR73 |
|
142100 |
-9 |
0.1 |
1001 |
R9FEU R1CDY 73 |
Пример сокращенного сеанса радиосвязи R9FEU c UA3DOI:
Время UTC |
SNR |
Δt |
Частота |
Данные |
141745 |
Tx |
1000 |
CQ R9FEU LO87 |
|
141800 |
8 |
0.1 |
1001 |
R9FEU UA3DOI -01 |
141815 |
Tx |
1000 |
UA3DOI R9FEU R+08 |
|
141830 |
-5 |
0.1 |
1001 |
R9FEU UA3DOI RR73 |
141845 |
Tx |
1000 |
UA3DOI R9FEU 73 |
Уровень представления
На уровне представления происходит подготовка данных, в частности перекодирование информации и расчет контрольной суммы сообщения CRC-14.
Формат сообщений
В основном, сообщения, передаваемые протоколами семейства FTX делятся на:
радиолюбительский обмен (QSO);
телеметрия;
произвольный текст.
Все радиолюбительские сообщения состоят из групп: вызываемый (call_to
), вызывающий (call_de
), данные (extra
). В качестве данных может передаваться SNR-рапорт, четырех-символьный квадрат локации и сигналы roger/73.
Кодирование сообщения
Большинство сообщений в протоколе состоят из трех полей: вызываемый, вызывающий и доп. данные (например рапорт или гео данные).
Кодирование заключается в том, чтобы максимально плотно уместить данные исходного сообщения в 77 бит.
Определение констант и утилит, необходимых для кодирования:
import string
FTX_CHAR_TABLE_NUMERIC = string.digits
FTX_CHAR_TABLE_LETTERS = string.ascii_uppercase
FTX_CHAR_TABLE_ALPHANUM = f"{FTX_CHAR_TABLE_NUMERIC}{FTX_CHAR_TABLE_LETTERS}"
FTX_CHAR_TABLE_LETTERS_SPACE = f" {FTX_CHAR_TABLE_LETTERS}"
FTX_CHAR_TABLE_ALPHANUM_SPACE = f" {FTX_CHAR_TABLE_ALPHANUM}"
FTX_CHAR_TABLE_ALPHANUM_SPACE_SLASH = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}/"
FTX_CHAR_TABLE_FULL = f"{FTX_CHAR_TABLE_ALPHANUM_SPACE}+-./?"
def charn(c: int, table: str) -> str:
return table[c]
def nchar(c: str, table: str) -> int:
return table.find(c)
В протоколах семейства FTX алфавит состоит из ASCII символов «ABCDEFGHIJKLMNOPQRSTUVWXYZ
», цифр «0123456789
» и символов «+-./?
», включая знак пробела.
Полный алфавит содержится в FTX_CHAR_TABLE_FULL
.
В алфавите FTX символу A соответствует значение в 1, B — 2 и т.д.. Для перевода из ANSI в алфавит FTX используется функция nchar
, которая по своей сути сопоставляет индекс кодируемого символа c
в таблице table, которой является одно из представлений значений FTX_CHAR_TABLE_*
. В случае обработки неподдерживаемого символа, функция возвращает значение -1
.
Для декодирования из формата FTX используется функция charn, которая решает обратную задачу, возвращает ANSI символ, соответствующей позиции c
в table
.
Кодирование позывного сигнала
Для кодирования позывного сигнала его необходимо нормализовать до 6-и символьного. Короткие позывные дополняются пробелами в начале строки, а длинные приводятся к другому виду. Швейцарские позывные из формата 3DA0XYZ
нормализуются в 3D0XYZ
, африканские 3XA0XYZ
в QA0XYZ
.
Функция кодирования позывного сигнала:
FTX_BASECALL_CHAR_MAP = [
FTX_CHAR_TABLE_ALPHANUM_SPACE,
FTX_CHAR_TABLE_ALPHANUM,
FTX_CHAR_TABLE_NUMERIC,
FTX_CHAR_TABLE_LETTERS_SPACE,
FTX_CHAR_TABLE_LETTERS_SPACE,
FTX_CHAR_TABLE_LETTERS_SPACE
]
def pack_basecall(callsign: str) -> int:
if (length := len(callsign)) > 2:
if callsign.startswith("3DA0") and 4 < length <= 7:
cs_6 = f"3D0{callsign[4:]}"
elif callsign.startswith("3X") and callsign[2].isalpha() and length <= 7:
cs_6 = f"Q{callsign[2:]}"
elif callsign[2].isdigit() and length <= 6:
cs_6 = callsign
elif callsign[1].isdigit() and length <= 5:
cs_6 = f" {callsign}"
else:
cs_6 = " " * 6
cs_6 = cs_6 + " " * (6 - len(cs_6))
n_chars = list(map(nchar, cs_6, FTX_BASECALL_CHAR_MAP))
if all(nc >= 0 for nc in n_chars):
n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_BASECALL_CHAR_MAP, n_chars), 0)
return n
return -1
В FTX_BASECALL_CHAR_MAP
определяется последовательность алфавитов, на основе которых будет производиться кодирование позывного сигнала, таким образом, первая цифро-буквенная литера является опциональной; третьим символом обязательно должно быть цифровое значение, далее следуют буквенные символы, включая пробел.
Функция pack_basecall
на вход принимает строку с позывным сигналом, длина которого должна быть минимум 3 знака. Далее позывной нормализуется до 6-и символьного. В n_chars
содержатся закодированные по схеме FTX_BASECALL_CHAR_MAP
символы; затем, если перекодирование всех символов прошло успешно, выполняется битовая упаковка данных, путем умножения результирующего значения на мощности используемых алфавитов.
При кодировании позывного R1ABC
происходит нормализация к виду ' R1ABC
', Индексы символов: [0, 27, 1, 1, 2, 3]
; результат: 5334879
(0b10100010110011101011111
).
Декодирование позывного сигнала
Операция декодирования позывного сигнала выполняет в обратном порядке все те же самые операции, что и операция кодирования.
NTOKENS = 2063592
MAX22 = 4194304
FTX_BASECALL_SUFFIX_FMT = {
1: "{cs}/R",
2: "{cs}/P",
}
FTX_TOKEN_STR = {
v: k for k, v in FTX_TOKEN_CODE.items()
}
def unpack_callsign(cs_28: int, flags: bool, suffix: int) -> typing.Optional[str]:
if cs_28 < NTOKENS:
if cs_28 <= 2:
return FTX_TOKEN_STR.get(cs_28)
if cs_28 <= 1002:
return f"CQ_{cs_28 - 3:03}"
if cs_28 <= 532443:
n = cs_28 - 1003
aaaa = ""
for i in range(4):
ct = FTX_CHAR_TABLE_LETTERS_SPACE
ct_l = len(ct)
aaaa = charn(n % ct_l, ct) + aaaa
n //= ct_l
return f"CQ_{aaaa.strip()}"
return None
cs_28 -= NTOKENS
n = cs_28 - MAX22
callsign = ""
for ct in reversed(FTX_BASECALL_CHAR_MAP):
ct_l = len(ct)
callsign = charn(n % ct_l, ct) + callsign
n //= ct_l
callsign = callsign.strip()
if callsign.startswith("3D0") and callsign[3] != " ":
result = f"3DA0{callsign[3:]}"
elif callsign[0] == "Q" and callsign[1].isalpha():
result = f"3X{callsign[1:]}"
else:
result = callsign
if len(result) < 3:
return None
if flags:
if fmt := FTX_BASECALL_SUFFIX_FMT.get(suffix):
return fmt.format(cs=result)
raise ValueError
return result
Функция unpack_callsign
выполняет распаковку бит в строку, в соответствии с таблицами символов. Декодирование символов происходит в обратном порядке, reversed(FTX_BASECALL_CHAR_MAP)
. Функция также проверяет, являются ли переданные данные шаблонными значениями, такими как CQ
/DE
/QRZ
, перечисленными в FTX_TOKEN_STR
.
Также дополнительно происходит добавление суффиксов, если указан параметр flags (суффиксы /R
, /P
из FTX_CALLSIGN_SUFFIX_FMT
). Длинные позывные, подвергшиеся сокращению на этапе кодирования, также восстанавливаются до первоначального вида.
Результатом работы функции является строка, содержащая позывной сигнал в текстовом виде.
Кодирование локации
При установлении связи, корреспонденты обмениваются геолокационными квадратами (QTH Loc), представляющие собой строку из чередующихся пар букв и цифр, каждая пара уточняет границы расположения географической точки.
Расчет значений:
-
пара букв:
долгота делится на 20, целая часть от 0 до 17 кодируется буквами от
A
доR
;широта делится на 10, целая часть от 0 до 17 кодируется буквами от
A
доR
;
-
пара цифр:
остаток долготы от деления на 20 делится на 2;
остаток широты от деления на 10 делится на 1.
FTX_CHAR_TABLE_GRID_LETTERS = FTX_CHAR_TABLE_LETTERS[:18]
FTX_GRID_CHAR_MAP = [
FTX_CHAR_TABLE_GRID_LETTERS,
FTX_CHAR_TABLE_GRID_LETTERS,
FTX_CHAR_TABLE_NUMERIC,
FTX_CHAR_TABLE_NUMERIC
]
def pack_grid(grid4: str) -> int:
n_chars = list(map(nchar, grid4, FTX_GRID_CHAR_MAP))
n = reduce(lambda a, it: a * len(it[0]) + it[1], zip(FTX_GRID_CHAR_MAP, n_chars), 0)
return n
Для кодирования квадрата используется усеченный алфавит FTX_CHAR_TABLE_GRID_LETTERS
и карта соответствия FTX_GRID_CHAR_MAP
.
Аналогично функции pack_basecall
, функция pack_grid
переводит четырехсимвольный квадрат в число; например квадрат KO85
(Москва) будет закодирован в 19485
(0b100110000011101
).
Кодирование рапорта
Так как третьей частью сообщения может быть как геолокационный квадрат или рапорт, так же может быть и roger-ответ; при кодировании необходимо отличать тип передаваемых данных.
FTX_MAX_GRID_4 = 32400
FTX_RESPONSE_EXTRAS_CODE = {
"": FTX_MAX_GRID_4 + 1,
"RRR": FTX_MAX_GRID_4 + 2,
"RR73": FTX_MAX_GRID_4 + 3,
"73": FTX_MAX_GRID_4 + 4,
}
def pack_extra(extra: str) -> int:
if id_resp := FTX_RESPONSE_EXTRAS_CODE.get(extra):
return id_resp
if re.match(r"^(([A-R]{2})([0-9]{2}))$", extra):
return pack_grid(extra)
if not (report := re.match(r"^(R){0,1}([\+\-]{0,1}[0-9]+)$", extra)):
raise FTXInvalidRST
r_sign, r_val = report.groups()
i_report = int(r_val) + 35
return (FTX_MAX_GRID_4 + i_report) | (0x8000 if r_sign is not None else 0)
Функция pack_extra
сначала проверяет признак, не являются ли переданные данные roger-ответом, далее, если данные соответствуют регулярному выражению с геолокационным квадратом, осуществляется вызов ранее рассмотренной функции pack_grid
; в противном случае переданные данные интерпретируются как SNR-рапорт.
Декодирование рапорта и локации
Операция декодирования рапорта/локации, как и вышеописанная операции декодирования позывного сигнала, также в обратном порядке выполняет действия, осуществляемые при кодировании.
FTX_RESPONSE_EXTRAS_STR = {
v: k for k, v in FTX_RESPONSE_EXTRAS_CODE.items()
}
def unpack_extra(ex_16: int, is_report: bool) -> typing.Optional[str]:
if ex_16 <= FTX_MAX_GRID_4:
n = ex_16
dst = ""
for ct in reversed(FTX_GRID_CHAR_MAP):
ct_l = len(ct)
dst = charn(n % ct_l, ct) + dst
n //= ct_l
return f"{'R ' if is_report else ''}{dst}"
else:
if irpt := FTX_RESPONSE_EXTRAS_STR.get(ex_16):
return irpt
return f"{'R' if is_report else ''}{int(ex_16 - FTX_MAX_GRID_4 - 35):+03}"
Функция unpack_extra
проверяет, являются ли переданные данные локатором или SNR-рапортом. Декодирование локации осуществляется с конца к началу.
При передаче рапорта, дополнительно, осуществляется проверка на шаблонный ответ из FTX_RESPONSE_EXTRAS_STR
.
Результатом работы функции является строка, содержащая либо идентификатор локации, либо рапорт (или шаблонный ответ, например RR73).
Кодирование стандартного сообщения
Стандартное сообщение состоит из трех частей:
общий вызов или позывной вызываемого;
позывной вызываемого;
квадрат локации или рапорт/roger-сигнал.
NTOKENS = 2063592
MAX22 = 4194304
FTX_TOKEN_CODE = {
"DE": 0,
"QRZ": 1,
"CQ": 2
}
def byte(i: int) -> int:
return i & 0xff
def endswith_any(s: str, *tails: str) -> bool:
return any(s.endswith(tail) for tail in tails)
def pack_callsign(callsign: str) -> typing.Tuple[int, int]:
shift = 0
if token := FTX_TOKEN_CODE.get(callsign):
return token, shift
length = len(callsign)
if callsign.startswith("CQ_") and length < 8:
rest = callsign[3:]
rest_len = len(rest)
if rest_len == 3 and rest.isdigit():
return int(rest) + 3, shift
if 1 <= rest_len <= 4:
nlet = 0
correct = True
for c in rest:
if (n := nchar(c, FTX_CHAR_TABLE_LETTERS_SPACE)) == -1:
correct = False
break
nlet = nlet * 27 + n
if correct:
return nlet + 1003, shift
length_base = length
if endswith_any(callsign, "/P", "/R"):
shift = 1
length_base = length - 2
if (n28 := pack_basecall(callsign[:length_base])) >= 0:
return dword(NTOKENS + MAX22 + n28), shift
raise FTXPackCallsignError
def ftx_message_encode_std(call_to: str, call_de: str, extra: str) -> typing.ByteString:
b28_to, sh_to = pack_callsign(call_to)
if b28_to < 0:
raise FTXErrorCallSignTo
b28_de, sh_de = pack_callsign(call_de)
if b28_de < 0:
raise FTXErrorCallSignDe
suffix = 1
if any(cs.endswith("/P") for cs in (call_to, call_de)):
suffix = 2
if any(cs.endswith("/R") for cs in (call_to, call_de)):
raise FTXErrorSuffix
if call_to == "CQ" and "/" in call_de and not endswith_any(call_de, "/P", "/R"):
raise FTXErrorCallSignDe
b16_extra = pack_extra(extra)
b29_to = dword(b28_to << 1 | sh_to)
b29_de = dword(b28_de << 1 | sh_de)
if endswith_any(call_to, "/P", "/R"):
b29_to |= 1
if call_to.endswith("/P"):
suffix = 2
bytes = [
byte(b29_to >> 21),
byte(b29_to >> 13),
byte(b29_to >> 5),
byte(b29_to << 3) | byte(b29_de >> 26),
byte(b29_de >> 18),
byte(b29_de >> 10),
byte(b29_de >> 2),
byte(b29_de << 6) | byte(b16_extra >> 10),
byte(b16_extra >> 2),
byte(b16_extra << 6) | byte(suffix << 3)
]
return bytearray(b for b in bytes)
Функция ftx_message_encode_std
принимает на вход три аргумента с вышеописанными данными. При кодировании позывных сигналов, функция pack_callsign
проверяет, является ли позывной специальным токеном (например CQ
при общем вызове), для которых, для повышения плотности, присвоены специальные коды в FTX_TOKEN_CODE
; далее, используя вышеописанные функции битового кодирования, позывной кодируется в набор бит в виде целого числа, после чего данные конкатенируются в массив байт.
Суффиксы позывных сигналов отмечаются в третьем блоке данных сообщения. Код byte(suffix << 3)
записывает в сообщение признак того, что это стандартное сообщение. Значение «1» суффикса соответствует обычному сообщению; значение «2» — проведение связей в формате DX-педиции.
Перечень типов сообщений:
FTX_MESSAGE_TYPE_FREE_TEXT
— 1;FTX_MESSAGE_TYPE_DXPEDITION
— 2;FTX_MESSAGE_TYPE_EU_VHF
— 3;FTX_MESSAGE_TYPE_ARRL_FD
— 4;FTX_MESSAGE_TYPE_TELEMETRY
— 6;FTX_MESSAGE_TYPE_CONTESTING
— 7;FTX_MESSAGE_TYPE_STANDARD
— 8;FTX_MESSAGE_TYPE_ARRL_RTTY
— 10;FTX_MESSAGE_TYPE_NONSTD_CALL
— 11;FTX_MESSAGE_TYPE_WWROF
— 12.
Примеры кодирования сообщений:
Сообщение |
Закодированные данные (hex) |
CQ R1ABC KO85 |
00000020587223930748 |
R2CBA R1ABC R+01 |
0b136da0587223bfad08 |
R1ABC R2CBA -20 |
0b0e4470589b6d1fa7c8 |
R2CBA R1ABC RR73 |
0b136da05872239fa4c8 |
Декодирование стандартного сообщения
Декодирование сообщения осуществляется выполнением в обратной последовательности тех же действий, что и при кодировании.
def ftx_message_decode_std(payload: typing.ByteString) -> typing.Tuple[str, str, str]:
b29_to = payload[0] << 21
b29_to |= payload[1] << 13
b29_to |= payload[2] << 5
b29_to |= payload[3] >> 3
b29_de = (payload[3] & 0x07) << 26
b29_de |= payload[4] << 18
b29_de |= payload[5] << 10
b29_de |= payload[6] << 2
b29_de |= payload[7] >> 6
r_flag = (payload[7] & 0x20) >> 5
b16_extra = (payload[7] & 0x1F) << 10
b16_extra |= payload[8] << 2
b16_extra |= payload[9] >> 6
cs_flags = (payload[9] >> 3) & 0x07
if (call_to := unpack_callsign(b29_to >> 1, bool(b29_to & 1), cs_flags)) is None:
raise FTXErrorCallSignTo
if (call_de := unpack_callsign(b29_de >> 1, bool(b29_de & 1), cs_flags)) is None:
raise FTXErrorCallSignDe
if (extra := unpack_extra(b16_extra, bool(r_flag & 1))) is None:
raise FTXErrorGrid
return call_to, call_de, extra
Функция ftx_message_decode_std
принимает на вход строку байт, битовыми операциями извлекает участки данных с закодированными позывными и рапортом/локацией. Далее функция передает извлеченные данные в функцию unpack_callsign
и unpack_extra
, которые декодируют значения и возвращают данные в строковом виде.
Результатом работы функции является тройка строковых данных, состоящих из вызываемого и вызывающего позывных, а также доп данных в виде локации/лапорта.
Телеметрия и произвольный текст
Протоколы семейства FTX предусматривают возможность отправки данных, не относящихся к стандартному радиолюбительскому обмену, например передавать данные телеметрии. Так как телеметрия представляет собой последовательность байт и максимальный размер передаваемых данных составляет 77 бит, то используя механизм упаковки бит, можно закодировать текстовое сообщение длиной до 12 символов.
FTX_MESSAGE_FREE_TEXT_LEN = 12
FTX_MESSAGE_TELEMETRY_LEN = 9
def ftx_message_encode_free(text: str) -> typing.ByteString:
if len(text) > FTX_MESSAGE_FREE_TEXT_LEN:
raise FTXErrorTooLong
payload = bytearray(b"\x00" * FTX_MESSAGE_TELEMETRY_LEN)
text = (" " * (FTX_MESSAGE_FREE_TEXT_LEN - len(text))) + text
for c in text:
if (cid := nchar(c, FTX_CHAR_TABLE_FULL)) == -1:
raise FTXErrorInvalidChar
rem = cid
for i in reversed(range(FTX_MESSAGE_TELEMETRY_LEN)):
rem += payload[i] * len(FTX_CHAR_TABLE_FULL)
payload[i] = byte(rem)
rem >>= 8
return ftx_message_encode_telemetry(payload)
def ftx_message_encode_telemetry(payload: typing.ByteString) -> typing.ByteString:
if len(payload) > FTX_MESSAGE_TELEMETRY_LEN:
raise FTXErrorTooLong
carry = 0
data = bytearray(b"\x00" * len(payload))
for i, t_byte in enumerate(reversed(payload)):
data[-i - 1] = byte((carry >> 7) | (t_byte << 1))
carry = byte(t_byte & 0x80)
return data
Функция ftx_message_encode_free
осуществляет упаковку бит символов из text, используя таблицу FTX_CHAR_TABLE_FULL
, в 9-и битный массив байт и передает его для дальнейшего кодирования в ftx_message_encode_telemetry
. Перед кодированием исходный текст нормализуется до 12-и символьной строки, путем добавления необходимого количества пробелов в начало текста.
Таким образом, строка «0123456789AB
» будет преобразована в массив байт (payload
) «000a7271499bcb384a
».
Функция ftx_message_encode_telemetry
выполняет битовый сдвиг данных для выравнивания по правому краю.
Результатом является массив байт, в каждой ячейке которого значения сдвинуты влево на 1 бит, относительно исходных данных.
Декодирования телеметрии и произвольного текста осуществляется в обратном порядке: принятые данные телеметрии сдвигаются на 1 бит вправо, а текст сообщения раскодируется по таблице FTX_CHAR_TABLE_FULL
обратно в строку.
def ftx_message_decode_telemetry(data: typing.ByteString) -> typing.Generator[int, None, None]:
carry = 0
for p_byte in data:
yield byte((carry << 7) | (p_byte >> 1))
carry = byte(p_byte & 0x01)
def ftx_message_decode_free(data: typing.ByteString) -> str:
payload = bytearray(ftx_message_decode_telemetry(data))
text = " "
for _ in range(FTX_MESSAGE_FREE_TEXT_LEN):
rem = 0
for i in range(FTX_MESSAGE_TELEMETRY_LEN):
rem = (rem << 8) | payload[i]
payload[i] = byte(rem // 42)
rem = rem % 42
text = charn(rem, FTX_CHAR_TABLE_FULL) + text
return text.strip()
CRC-14
Расчет CRC-14
В качестве проверки целостности сообщения в протоколах семейства FTX предусмотрен расчет 14-и битного циклического избыточного кода (контрольной суммы) — CRC-14. Данный код подтверждает факт целостности данных в принятом сигнале; сигналы, в которых расчетная контрольная сумма сообщения не совпадает с фактической, игнорируются на принимающей стороне.
При 14-и битной контрольной сумме, алгоритм CRC-14 обеспечивает высокую вероятность обнаружения ошибок на коротких сообщениях, и минимально расходует объем передаваемых данных. По сравнению. например, с CRC-8, рассматриваемый алгоритм позволяет находить до 4 бит ошибок, а также имеет более низкую вероятность коллизии (1/16384 против 1/256 для CRC-8). Таким образом, алгоритм CRC-14 используется как компромисс между компактностью и надежностью; алгоритм CRC-8 и близкие к нему, более применимы к совсем коротким сообщениям.
FTX_LDPC_K = 91
FTX_LDPC_M = 83
FTX_LDPC_N = FTX_LDPC_K + FTX_LDPC_M
FTX_LDPC_N_BYTES = ((FTX_LDPC_N + 7) // 8)
FTX_LDPC_K_BYTES = ((FTX_LDPC_K + 7) // 8)
FTX_CRC_POLYNOMIAL = 0x2757
FTX_CRC_WIDTH = 14
FTX_PAYLOAD_BITS = 96
FTX_MESSAGE_BITS = FTX_PAYLOAD_BITS - FTX_CRC_WIDTH
TOPBIT = 1 << (FTX_CRC_WIDTH - 1)
def ftx_compute_crc(message: typing.ByteString, num_bits: int) -> int:
remainder = 0
idx_byte = 0
for idx_bit in range(num_bits):
if idx_bit % 8 == 0:
remainder ^= message[idx_byte] << (FTX_CRC_WIDTH - 8)
idx_byte += 1
if remainder & TOPBIT != 0:
remainder = (remainder << 1) ^ FTX_CRC_POLYNOMIAL
else:
remainder = remainder << 1
return remainder & ((TOPBIT << 1) - 1)
def ftx_add_crc(payload: typing.ByteString) -> typing.ByteString:
message = payload + (b"\x00" * (FTX_LDPC_K_BYTES - len(payload)))
message[-3] &= 0xf8
message[-2] = 0
checksum = ftx_compute_crc(message, FTX_MESSAGE_BITS)
message[-3] |= byte(checksum >> 11)
message[-2] = byte(checksum >> 3)
message[-1] = byte(checksum << 5)
return message
Функция ftx_compute_crc
реализует алгоритм вычисления CRC-14. Полином, для вычисления остатка от деления данных, определен в FTX_CRC_POLYNOMIAL
со значением 0x2757
(0b10011101010111
).
Задача функции ftx_add_crc
рассчитать и добавить к исходным данных значение функции ftx_compute_crc; предварительно данные подгоняются под длину сообщения в 12 байт (98 бит), контрольная сумма записывается в три последних байта сообщения.
Пример контрольной суммы для данных 00000020587223930748
(закодированное сообщение CQ R1ABC KO85
): 11173
(0x2ba5
); результат добавления контрольной суммы к данным: 0000002058722393074d74a0
.
Проверка CRC-14
Так как принимаемое сообщение представляет собой массив байт, в котором содержатся сами данные сообщения и контрольный проверочный код CRC-14, при проверке достаточно отделить контрольную сумму от данных, выполнить пересчет CRC-14 и сравнить полученный результат с принятой контрольной суммой.
def ftx_crc(msg1: typing.ByteString, msglen: int) -> typing.ByteString:
div = [1, 1, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 1]
msg = bytearray(b"\x00" * (FTX_LDPC_M + FTX_CRC_WIDTH))
for i in range(msglen + FTX_CRC_WIDTH):
if i < 77:
msg[i] = msg1[i]
for i in range(msglen):
if msg[i] != 0:
for j, d in enumerate(div):
msg[i + j] = msg[i + j] ^ d
return msg[msglen:msglen + FTX_CRC_WIDTH]
def ftx_check_crc(a91: typing.ByteString) -> bool:
out1 = ftx_crc(a91, 82)
for i, b in enumerate(out1):
if b != a91[FTX_LDPC_K - FTX_CRC_WIDTH + i]:
return False
return True
Функция ftx_check_crc
осуществляет проверку CRC-14 в блоке данных a91
, извлекая 82 бита данных из проверяемого 91 битного блока данных и сверяет совпадение CRC-14 данных с контрольным кодом, находящимся в хвосте a91
.
Результатом функции является булево значение.
LDPC (низкоплотностный код)
LDPC (Low-Density Parity-Check code) — код с малой плотностью проверок на чётность. Алгоритм используется для исправления ошибок, возникающих при передаче данных, повышая общую помехоустойчивость.
Более подробное описание теории и принципов работы LDPC достаточно объемное и выходит за рамки статьи.
С подробностями устройства и работы LDPC можно ознакомиться здесь:
https://en.wikipedia.org/wiki/Low-density_parity-check_code
https://habr.com/ru/articles/453086/
https://habr.com/ru/articles/830150/
Кодирование LDPC
LDPC-код относится к категории линейных блочных кодов, определяемых матрицей проверки на четность с небольшим количеством единиц; таким образом, в матрице, используемой при кодировании, количество единиц относительно невелико по сравнению с общим количеством элементов в матрице.

Рисунок 2: Граф Таннера и матрица проверок LDPC.
На рисунке 2 приведена схема представления работы LDPC-кодов, в частности LDPC-код (12, 8); в кругах, пронумерованных от 1 до 8 обозначены данные исходного сообщения, в квадратах с номерами от 1 до 4 — биты четности с такими значениями, чтобы результат сложения бит данных с битом четности, по модулю 2, был равен нулю.
Проверочная матрица H в столбцах определяет то, с какими битами исходного сообщения связан бит четности (так, на рисунке линиями первый бит четности соединен с 1, 4,5, 7 и 8 битами сообщения, что соответствует первой строке матрице в виде записи [1 0 0 1 1 0 1 1]); количество строк матрицы определяет то, сколько битов четности будет рассчитано. Также проверочная матрица H должна обладать свойствами разреженности и нерегулярности.
Таким образом, зная дополнительные биты четности и структуру проверочной матрицы, повторно выполнив суммирование бит по модулю 2, можно проверить корректность данных и, в случае искажения данных, локализовать и устранить ошибку, так как на один бит данных может ссылаться несколько битов проверки четности. Таким же образом осуществляется определение синдрома ошибки в самих проверочных битах.
Протоколы семейства FTX используют LDPC(174, 87), то есть к исходным данным добавляется 87 бит с проверкой на четность.
Таким образом, на принимающей стороне, при потере или искажении части сообщения, сигнал можно будет восстановить используя избыточные биты рассчитанные при LDPC-кодировании.
FTX_LDPC_GENERATOR = [
[0x83, 0x29, 0xce, 0x11, 0xbf, 0x31, 0xea, 0xf5, 0x09, 0xf2, 0x7f, 0xc0],
[0x76, 0x1c, 0x26, 0x4e, 0x25, 0xc2, 0x59, 0x33, 0x54, 0x93, 0x13, 0x20],
[0xdc, 0x26, 0x59, 0x02, 0xfb, 0x27, 0x7c, 0x64, 0x10, 0xa1, 0xbd, 0xc0],
...
[0x60, 0x8c, 0xc8, 0x57, 0x59, 0x4b, 0xfb, 0xb5, 0x5d, 0x69, 0x60, 0x00]
]
def parity8(x: int) -> int:
for i in [4, 2, 1]:
x ^= x >> i
return byte(x % 2)
def ftx_encode(message: typing.ByteString) -> typing.ByteString:
codeword = bytearray(message[i] if i < FTX_LDPC_K_BYTES else 0 for i in range(FTX_LDPC_N_BYTES))
col_mask = 0x80 >> (FTX_LDPC_K % 8)
col_idx = FTX_LDPC_K_BYTES - 1
for i in range(FTX_LDPC_M):
nsum = 0
for j in range(FTX_LDPC_K_BYTES):
bits = message[j] & FTX_LDPC_GENERATOR[i][j]
nsum ^= parity8(bits)
if nsum % 2:
codeword[col_idx] |= col_mask
col_mask >>= 1
if col_mask == 0:
col_mask = 0x80
col_idx += 1
return codeword
Проверочная матрица определена в FTX_LDPC_GENERATOR
, так как она достаточно большая, чтобы целиком приводить ее в тексте, приведены только начало и конец матрицы. Саму матрицу целиком можно посмотреть здесь.
Функция ftx_encode
осуществляет LDPC кодирование данных, переданных в параметре message; результатом функции является 174 битный код, содержащий в себе исходные данные из message, включая 87 LDPC-бит.
Функция parity8
осуществляет побитовую операцию XOR для расчета бита четности.
Пример:
message
: 0000002058722393074d74a0
ftx_encode
: 0000002058722393074d74a67d749e15d81ecea9e3a0
Декодирование LDPC
Один из популярных алгоритмов декодирования LDPC-кодов — алгоритм Belief Propagation (алгоритм распространения доверия).
Каждая итерация алгоритма состоит из двух шагов:
узлы данных передают сообщения узлам проверок;
проверочные узлы передают сообщения узлам данных.
Итерации повторяются до достижения приемлемого количества ошибок, либо до достижения максимального количества итераций.
FTX_LDPC_NM = [
[4, 31, 59, 91, 92, 96, 153],
...
[17, 42, 75, 129, 170, 172, 0],
]
FTX_LDPC_MN = [
[16, 45, 73],
...
[42, 49, 57],
]
FTX_LDPC_NUM_ROWS = [
7, 6, 6, 6, 7, 6, 7, 6, 6, 7, 6, 6, 7, 7, 6, 6,
...
6, 6, 6
]
def fast_tanh(x: float) -> float:
if x < -4.97:
return -1.0
if x > 4.97:
return 1.0
x2 = x ** 2
a = x * (945.0 + x2 * (105.0 + x2))
b = 945.0 + x2 * (420.0 + x2 * 15.0)
return a / b
def fast_atanh(x: float) -> float:
x2 = x ** 2
a = x * (945.0 + x2 * (-735.0 + x2 * 64.0))
b = (945.0 + x2 * (-1050.0 + x2 * 225.0))
return a / b
def ldpc_check(codeword: bytes) -> int:
errors = 0
for m in range(FTX_LDPC_M):
x = 0
for i in range(FTX_LDPC_NUM_ROWS[m]):
x ^= codeword[FTX_LDPC_NM[m][i] - 1]
if x:
errors += 1
return errors
def bp_decode(codeword: typing.List[float], max_iters: int) -> typing.Tuple[int, typing.ByteString]:
min_errors = FTX_LDPC_M
tov = [[0.0] * 3 for _ in range(FTX_LDPC_N)]
toc = [[0.0] * 7 for _ in range(FTX_LDPC_M)]
plain = bytearray(b"\x00" * FTX_LDPC_N)
for _ in range(max_iters):
plain_sum = 0
for n in range(FTX_LDPC_N):
plain[n] = int((codeword[n] + tov[n][0] + tov[n][1] + tov[n][2]) > 0)
plain_sum += plain[n]
if plain_sum == 0:
min_errors = FTX_LDPC_M
break
if (errors := ldpc_check(plain)) < min_errors:
min_errors = errors
if errors == 0:
break
for m in range(FTX_LDPC_M):
for n_idx in range(FTX_LDPC_NUM_ROWS[m]):
n = FTX_LDPC_NM[m][n_idx] - 1
Tnm = codeword[n]
for m_idx in range(3):
if (FTX_LDPC_MN[n][m_idx] - 1) != m:
Tnm += tov[n][m_idx]
toc[m][n_idx] = fast_tanh(-Tnm / 2)
for n in range(FTX_LDPC_N):
for m_idx in range(3):
m = FTX_LDPC_MN[n][m_idx] - 1
Tmn = 1.0
for n_idx in range(FTX_LDPC_NUM_ROWS[m]):
if (FTX_LDPC_NM[m][n_idx] - 1) != n:
Tmn *= toc[m][n_idx]
tov[n][m_idx] = -2 * fast_atanh(Tmn)
return min_errors, plain
def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:
sum = 0
sum2 = 0
for it in log174:
sum += it
sum2 += it ** 2
inv_n = 1.0 / FTX_LDPC_N
variance = (sum2 - (sum * sum * inv_n)) * inv_n
norm_factor = math.sqrt(24.0 / variance)
for it in log174:
yield it * norm_factor
Функция bp_decode
реализует алгоритм Belief Propagation, в параметр codeword передается нормализованные функцией ftx_normalize_logl
данные для декодирования; параметр max_iters
определяет максимальное количество итераций на попытку раскодировать данные.
Результатом работы функции является кортеж, содержащий в себе счетчик ошибок и набор байт декодированных данных.
Матрица FTX_LDPC_NM
содержит строки проверки четности, каждое значение — индекс в кодовом слове.
Матрица FTX_LDPC_MN
описывает биты кодового слова, числа определяют какие три проверки четности из FTX_LDPC_NM
относятся к кодовому слову.
Размеры матриц слишком большие чтобы полностью рассмотреть их в рамках статьи и доступны по ссылкам:
Примечание: рассматриваемый пример слабо оптимизирован и имеет высокую алгоритмическую сложность, на Python работает достаточно медленно на сильно зашумленных данных.
Транспортный уровень
На транспортном уровне в протоколах семейства FTX происходит перекодирование бит данных в код Грея и добавление данных матриц Костаса в качестве маркеров сигнала.
Код Грея
Код Грея (Gray code) — двоичный, зеркальный код, в котором две соседние комбинации отличаются одной цифрой (расстояние Хемминга между соседними комбинациями кода равно единице).
Пример записи кода Грея для размерностей 1, 2 и 3 приведен на рисунке 3, также можно заметить симметричность кода, из-за чего код и получил название «зеркальный код».

Рисунок 3: Коды Грея размерностей 1, 2 и 3, и его свойство зеркальности.


Рисунок 4: Круговой энкодер с кодом Грея и его развернутое представление.
На рисунке 4 приведен пример использования кода Грея в энкодере.
Более подробно про код Грея можно прочитать здесь.
Исходя из этих свойств, код Грея позволяет снизить вероятность ошибки при изменении значений без сигналов тактирования.
В протоколах семейства FTX код Грея также применяется для повышения помехоустойчивости, снижая вероятность ошибки на принимающей стороне при декодировании, таким образом, приемник сможет исправить ошибку, так как соседние значения будут отличаться она один бит.
Для FT8 используется трех битный код Грея, для FT4 — двух.
Алгоритм расчета кода Грея:
def bin_to_gray(b: int) -> int:
return b ^ (b >> 1)
for i in range(10):
print(f"{i}, {i:5b}, {bin_to_gray(i):5b}")
Результат расчета:
dec, bin, gray
0, 00000, 00000
1, 00001, 00001
2, 00010, 00011
3, 00011, 00010
4, 00100, 00110
5, 00101, 00111
6, 00110, 00101
7, 00111, 00100
8, 01000, 01100
9, 01001, 01101
Перевод в код Грея
Исходя из таблицы, коды Грея можно представить в виде списков, где значения индексов списка будут совпадать с представлением числа в формате Грея:
FT8_GRAY_MAP = [0, 1, 3, 2, 5, 6, 4, 7]
FT4_GRAY_MAP = [0, 1, 3, 2]
Таким образом, задача перевода данных из двоичного кода в код Грея реализуется кодом:
mask = 0x80
i_byte = 0
bits3 = 0
for bit_or in [4, 2, 1]:
if codeword[i_byte] & mask:
bits3 |= bit_or
mask >>= 1
if mask == 0:
mask = 0x80
i_byte += 1
yield FT8_GRAY_MAP[bits3]
mask = 0x80
i_byte = 0
bits2 = 0
for bit_or in [2, 1]:
if codeword[i_byte] & mask:
bits2 |= bit_or
mask >>= 1
if mask == 0:
mask = 0x80
i_byte += 1
yield FT4_GRAY_MAP[bits2]
Где bits3
и bits2
— биты исходного сообщения codeword
.
Перевод из кода Грея
Декодирование в протоколах семейства FTX осуществляется перебором, путем выделения из сигнала тех бит, амплитуды которых в сигнале максимальны, они, в свою очередь, дадут представление о битах в двоичной форме.
def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:
s2 = [self.wf.mag[mag_idx + gc] for gc in FT4_GRAY_MAP]
logl_0 = max(s2[2], s2[3]) - max(s2[0], s2[1])
logl_1 = max(s2[1], s2[3]) - max(s2[0], s2[2])
return logl_0, logl_1
def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:
s2 = [self.wf.mag[mag_idx + gc] for gc in FT8_GRAY_MAP]
logl_0 = max(s2[4], s2[5], s2[6], s2[7]) - max(s2[0], s2[1], s2[2], s2[3])
logl_1 = max(s2[2], s2[3], s2[6], s2[7]) - max(s2[0], s2[1], s2[4], s2[5])
logl_2 = max(s2[1], s2[3], s2[5], s2[7]) - max(s2[0], s2[2], s2[4], s2[6])
return logl_0, logl_1, logl_2
Функции ft4_extract_symbol
и ft8_extract_symbol
принадлежат классу Monitor, инкапсулирующего в себе логику анализа и декодирования данных (данный тезис будет более понятен при чтении статьи от конца к началу). Параметр mag_idx
— номер корзины в спектре FFT.
Результат работы функции — кортеж из амплитудных значений бит данных.
Массив Костаса
Массив Костаса (Costas array) — тип решетки точек n*n
, в которой ни одна пара точек, не имеет одинакового вектора смещения; иными словами в массиве существует только одна точка в строке и одна точка в столбце и расстояние между любыми двумя парами точек уникально, тем самым, два массива Костаса с одинаковыми значениями после операций параллельного переноса и транспонирования могут быть взаимно симметричны только тогда, когда полностью совпадают расположения точек в обоих массивах.
Исходя из этого, автокорреляционная функция (АКФ) по точкам из массива Костаса стремится к нулю, тем самым минимизируя ложные совпадения и взаимные помехи.

Рисунок 5: Примеры сигналов и графики их АКФ.
На рисунке 5 приведены примеры функций и графики их АКФ; при высоком уровне автокорреляций, огибающая графика имеет плавный рост и убывание на всей длине сигнала (рисунок 5, график A), а при низкой автокорреляции заметного роста не происходит, но, когда сигнал совпадает сам с собой, возникает резкий всплеск, например, в случае с белым шумом (рисунок 5, график D), другими словами при высокой автокорреляции функции градиент плавный, при низкой — резкий.
В протоколах семейства FTX массивы костаса используются в качестве маркеров синхронизации, решая задачу выделения полезных сигналов при наличии интерференций и шума.
Принцип применения массива Костаса можно рассмотреть на примере.
Допустим, имеется диагональная матрица из светодиодов и непрозрачная маска с отверстиями, которые расположены на тех же местах, что и светодиоды (рисунок 6), таким образом, перемещая маску по вертикали и по горизонтали можно добиться полного совпадения отверстий в маске со светодиодами (рисунок 7.2); однако, если один из светодиодов будет погашен (тем самым имитируя частичную потерю или искажение данных), то при такой конфигурации маски и светодиодов возникает неопределенность, когда невозможно точно определить находится ли найденный массив в начале или конце маски (рисунок 7.1).

Рисунок 6: Маска с отверстиями и диодная матрица.

Рисунок 7: 1: Состояние неопределенности; 2: Поиск диодов по маске.
Конфигурация маски и светодиодной матрицы с расположением отверстий и светодиодов согласно массиву Костаса 3*3
решает проблему неопределенности (рисунок 8), таким образом, если один из светодиодов будет выключен, то оставшихся двух будет достаточно для выравнивания маски, избегая состояния неопределенности.

Рисунок 8: маска и светодиодная матрица сконфигурированные по массиву Костаса.
Использование массива Костаса также обеспечивает точность позиционирования, если данные скомпрометированы, например один из светодиодов будет расположен значительно выше или ниже ожидаемого положения.
При использовании массива Костаса 3*3
состояние неопределенности наступит вновь, когда будут отключены два или более светодиодов.
Исходя из описанных свойств и обеспечения точности синхронизации сигналов во времени и частоте, массивы Костаса нашли применение в гидро и радиолокации.
Внедрение массива Костаса в сигнал
В протоколе FT8 используется массив Костаса 7*7
, со значениями [3, 1, 4, 0, 6, 5, 2]
и записывается в начало, середину и конец сообщения.
В отличие от FT8, протокол FT4 использует уже 4 массива Костаса 4*4
со значениями: [[0, 1, 3, 2], [1, 0, 2, 3], [2, 3, 1, 0], [3, 2, 0, 1]]
.
Примечание: причина выбора массива Костаса со значениями [3, 1, 4, 0, 6, 5, 2]
до конца не понятна, но возможно это отсылка, по причине схожести первых трех чисел из числа пи.
FT8_NN = 79
FT4_NN = 105
FT8_COSTAS_PATTERN = [3, 1, 4, 0, 6, 5, 2]
FT4_COSTAS_PATTERN = [
[0, 1, 3, 2],
[1, 0, 2, 3],
[2, 3, 1, 0],
[3, 2, 0, 1]
]
FT4_XOR_SEQUENCE = [
0x4A, # 01001010
0x5E, # 01011110
0x89, # 10001001
0xB4, # 10110100
0xB0, # 10110000
0x8A, # 10001010
0x79, # 01111001
0x55, # 01010101
0xBE, # 10111110
0x28, # 00101 [000]
]
def ft8_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:
a91 = ftx_add_crc(payload)
codeword = ftx_encode(a91)
mask = 0x80
i_byte = 0
for i_tone in range(FT8_NN):
if 7 > i_tone >= 0:
yield FT8_COSTAS_PATTERN[i_tone]
elif 43 > i_tone >= 36:
yield FT8_COSTAS_PATTERN[i_tone - 36]
elif 79 > i_tone >= 72:
yield FT8_COSTAS_PATTERN[i_tone - 72]
else:
bits3 = 0
for bit_or in [4, 2, 1]:
if codeword[i_byte] & mask:
bits3 |= bit_or
mask >>= 1
if mask == 0:
mask = 0x80
i_byte += 1
yield FT8_GRAY_MAP[bits3]
def ft4_encode(payload: typing.ByteString) -> typing.Generator[int, None, None]:
payload_xor = bytearray(b"\x00" * 10)
for i in range(10):
payload_xor[i] = payload[i] ^ FT4_XOR_SEQUENCE[i]
a91 = ftx_add_crc(payload_xor)
codeword = ftx_encode(a91)
mask = 0x80
i_byte = 0
for i_tone in range(FT4_NN):
if i_tone == 0 or i_tone == 104:
yield 0
elif 5 > i_tone >= 1:
yield FT4_COSTAS_PATTERN[0][i_tone - 1]
elif 38 > i_tone >= 34:
yield FT4_COSTAS_PATTERN[1][i_tone - 34]
elif 71 > i_tone >= 67:
yield FT4_COSTAS_PATTERN[2][i_tone - 67]
elif 104 > i_tone >= 100:
yield FT4_COSTAS_PATTERN[3][i_tone - 100]
else:
bits2 = 0
for bit_or in [2, 1]:
if codeword[i_byte] & mask:
bits2 |= bit_or
mask >>= 1
if mask == 0:
mask = 0x80
i_byte += 1
yield FT4_GRAY_MAP[bits2]
Генераторная функция ft8_encode
вызывает ранее рассмотренные функции вычисления контрольных сумм и LDPC кодирования, после чего формирует последовательность по схеме: S7 D29 S7 D29 S7
, где S
— данные массива Костаса, D
— данные сообщения после ранее преобразованные в код Грея. Выходными данными функции являются номера GFSK-тонов.
Пример данных, сформированные функцией ft8_encode
: 3140652 00000000100651431071150732373 3140652 35427373324062650244263575260 3140652
(блоки данных и массивы Костаса разделены пробелами).
Работа генераторной функции ft4_encode
аналогична работе функции ft8_encode, с той лишь разницей, что кодируемые данные предварительно подвергаются операции XOR
в соответствии со схемой FT4_XOR_SEQUENCE
, такое повышение энтропии необходимо для того, чтобы свести к минимуму ситуацию, когда в сообщении идет друг за другом большое количество нулей, что может негативно повлиять на помехоустойчивость.
Данные FT4 до и после операции XOR: 00000020587223930748
/ 4a5e8994e8f85ac6b960
.
Формирование результата осуществляется по схеме: R S4_1 D29 S4_2 D29 S4_3 D29 S4_4 R, где R — ramping symbol, равный 0.
Пример данных, сформированные функцией ft4_encode
: 0 0132 10331123303131102330223011332 1023 01332311302112123233233113233 2310 30302303033330213121320010313 3201 0
(блоки данных и массивы Костаса разделены пробелами)
FT8_NN
, FT4_NN
— общее количество тонов в последовательности сигнала FT8/FT4.
Поиск сигнала по массиву Костаса
В протоколах семейства FTX при детекции сигналов в общей полосе производится обход спектра временного окна и скоринг сигнала по амплитудам частот, расположенных согласно значениям из массива Костаса. При достижении порогового значения скоринга, полоса частот во временном промежутке принимается в расчет для последующего декодирования сигнала.
@dataclass
class Candidate:
time_offset: int
freq_offset: int
time_sub: int
freq_sub: int
score: int = 0
Класс Candidate
для хранения информации о положении сигнала и его скор-балла в общем сигнале.
time_offset
— индекс блока времени;
freq_offset
— индекс блока частот;
time_sub
— подиндекс блока времени при оверсемплинге;
freq_sub
— подиндекс блока частот при оверсемплинге.
def get_cand_mag_idx(self, candidate: Candidate) -> int:
wf = self.wf
offset = candidate.time_offset
offset = offset * wf.time_osr + candidate.time_sub
offset = offset * wf.freq_osr + candidate.freq_sub
offset = offset * wf.num_bins + candidate.freq_offset
return offset
def ft8_sync_score(self, candidate: Candidate) -> int:
wf = self.wf
score = 0
num_average = 0
mag_cand = self.get_cand_mag_idx(candidate)
for m in range(FT8_NUM_SYNC):
for k in range(FT8_LENGTH_SYNC):
block = FT8_SYNC_OFFSET * m + k
block_abs = candidate.time_offset + block
if block_abs < 0:
continue
if block_abs >= wf.num_blocks:
break
p8 = mag_cand + block * wf.block_stride
sm = FT8_COSTAS_PATTERN[k]
p8sm = p8 + sm
if sm > 0:
score += wf.mag[p8sm] - wf.mag[p8sm - 1]
num_average += 1
if sm < 7:
score += wf.mag[p8sm] - wf.mag[p8sm + 1]
num_average += 1
if k > 0 and block_abs > 0:
score += wf.mag[p8sm] - wf.mag[p8sm - wf.block_stride]
num_average += 1
if k + 1 < FT8_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:
score += wf.mag[p8sm] - wf.mag[p8sm + wf.block_stride]
num_average += 1
if num_average > 0:
score = int(score / num_average)
return score
def ft4_sync_score(self, candidate: Candidate) -> int:
wf = self.wf
score = 0
num_average = 0
mag_cand = self.get_cand_mag_idx(candidate)
for m in range(FT4_NUM_SYNC):
for k in range(FT4_LENGTH_SYNC):
block = 1 + (FT4_SYNC_OFFSET * m) + k
block_abs = candidate.time_offset + block
if block_abs < 0:
continue
if block_abs >= wf.num_blocks:
break
p4 = mag_cand + (block * wf.block_stride)
sm = FT4_COSTAS_PATTERN[m][k]
p4sm = p4 + sm
if sm > 0:
score += wf.mag[p4sm] - wf.mag[p4sm - 1]
num_average += 1
if sm < 3:
score += wf.mag[p4sm] - wf.mag[p4sm + 1]
num_average += 1
if k > 0 and block_abs > 0:
score += wf.mag[p4sm] - wf.mag[p4sm - wf.block_stride]
num_average += 1
if k + 1 < FT4_LENGTH_SYNC and block_abs + 1 < wf.num_blocks:
score += wf.mag[p4sm] - wf.mag[p4sm + wf.block_stride]
num_average += 1
if num_average > 0:
score = int(score / num_average)
return score
def ftx_sync_score(self, candidate: Candidate) -> int:
wf = self.wf
if wf.protocol == FTX_PROTOCOL_FT4:
sync_fun = self.ft4_sync_score
elif wf.protocol == FTX_PROTOCOL_FT8:
sync_fun = self.ft8_sync_score
else:
raise ValueError("Invalid protocol")
return sync_fun(candidate)
def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:
wf = self.wf
num_tones = FTX_TONES_COUNT[wf.protocol]
time_offset_range = range(-FTX_LENGTH_SYNC[wf.protocol], int(FTX_TIME_RANGE[wf.protocol]))
heap = []
can = Candidate(0, 0, 0, 0)
for time_sub in range(wf.time_osr):
for freq_sub in range(wf.freq_osr):
for time_offset in time_offset_range:
for freq_offset in range(wf.num_bins - num_tones):
can.time_sub = time_sub
can.freq_sub = freq_sub
can.time_offset = time_offset
can.freq_offset = freq_offset
if (score := self.ftx_sync_score(can)) < min_score:
continue
candidate = copy(can)
candidate.score = score
heap.insert(0, candidate)
heap.sort(key=lambda x: x.score, reverse=True)
return heap[:num_candidates]
Вспомогательная функция get_cand_mag_idx
переводит параметры частоты и времени кандидата в позицию в матрице амплитуд сигнала.
Функция ftx_find_candidates
осуществляет подбор и скоринг кандидатов, перебирая частоты и временные временные отметки, в которых потенциально может быть обнаружен сигнал. Кандидат передается в функцию ftx_sync_score
для анализа, если удается обнаружить признаки сигналов по массиву Костаса, то значение скор-балла увеличивается.
Результатом работы функции является список кандидатов (heap
), отсортированный по признаку score, исключая кандидатов, чей скор-балл ниже значения в min_score
; размер списка ограничен размером num_candidates
.
Функция ftx_sync_score
, в зависимости от протокола, FT8 или FT4, передает управление функциям ft4_sync_score
и ft8_sync_score
, которые в свою очередь осуществляют анализ сигнала на наличие маркеров-массивов Костаса.
Функции ft8_sync_score
и ft4_sync_score
реализуют обход участка частот кандидата, согласно расположениям массивов Костаса (позиции 0-7
, 36-43
, 72-79
для FT8 и 1-4
, 34-37
, 67-70
, 100-103
для FT4). В значении score суммируется разность амплитуд между искомыми и соседними частотами, иными словами, чем резче градиент амплитуд в полосе, тем выше скор-балл кандидата. Переменная wf.mag
(ссылка на self.wf
) — матрица амплитуд (WFFT) принятого сигнала.
Канальный уровень
На канальном уровне происходит синтезирование сигнала, на основе данных, сформированным на предыдущем уровне. В результате формируется дискретный сигнал, готовый для передачи по аналоговому (звуковому) каналу связи.
При приеме сигнала, на этом уровне осуществляется оконное преобразование Фурье и вычисление амплитуд.
FSK и GFSK формирование сигнала
FSK (Frequency Shift Keying) — вид модуляции (манипуляции), при котором информация кодируется изменением несущей частоты сигнала. Для кодирования информации может использоваться две или более частот, где каждому состоянию соответствует та или иная частота.
На рисунке 9 изображено представление сигнала с использованием FSK для бинарных данных, в частности единицам соответствует высокая частота, а нулям меньшая.

Рисунок 9: FSK для двоичных данных и вид сигнала.
Стоит обратить внимание на то, что в рассматриваемом примере фаза непрерывна, то есть не имеет скачков.
GFSK (Gaussian Frequency-Shift Keying) — подвид FSK манипуляции при которой используется фильтр Гаусса (рисунок 10) для сглаживания частотных перестроек.

Рисунок 10: Импульсная переходная функция фильтра Гаусса.
Принцип модуляции GFSK идентичен обычному FSK, с той разницей, что формируемые импульсы тонов предварительно проходят через фильтр Гаусса для сглаживания (плавно перестраивает фазу сигнала в начале и конце тона), что позволяет уменьшить ширину спектра сигнала, так как резкая перестройка частоты на спектрограмме выглядит как широкополосный выброс (рисунок 13).

Рисунок 11: Сравнение FSK и GFSK.
На рисунках 11 и 12 показано сравнение сигналов и спектров обычного FSK и того же сигнала с использовании фильтра Гаусса (GFSK).
Фильтр Гаусса описывается уравнением 1.

(1)
Где

, а BT
— коэффициент сглаживания,чем он больше, тем меньше сглаживание исходного импульса (рисунок 12).

Рисунок 12: Сравнение сглаживаний при изменении параметра BT.

Рисунок 13: Сравнение спектров FSK и GFSK сигналов.
В протокол FT8 используется ширина сглаживания BT
равная 2.0
, в протоколе FT4 — 1.0
.
import math
import typing
FT8_SYMBOL_PERIOD = 0.160
FT4_SYMBOL_PERIOD = 0.048
FT8_SYMBOL_BT = 2.0
FT4_SYMBOL_BT = 1.0
GFSK_K = math.pi * math.sqrt(2 / math.log(2))
def gfsk_pulse(n_spsym: int, symbol_bt: float) -> typing.Generator[float, None, None]:
for i in range(3 * n_spsym):
t = i / n_spsym - 1.5
arg1 = GFSK_K * symbol_bt * (t + 0.5)
arg2 = GFSK_K * symbol_bt * (t - 0.5)
val = (math.erf(arg1) - math.erf(arg2)) / 2
yield val
def synth_gfsk(symbols: typing.List[int], n_sym: int,
f0: float,
symbol_bt: float, symbol_period: float,
signal_rate: int) -> typing.Generator[float, None, None]:
n_spsym = int(0.5 + signal_rate * symbol_period)
n_wave = n_sym * n_spsym
hmod = 1.0
dphi_peak = 2 * math.pi * hmod / n_spsym
dphi = [2 * math.pi * f0 / signal_rate] * (n_wave + 2 * n_spsym)
pulse = list(gfsk_pulse(n_spsym, symbol_bt))
for i in range(n_sym):
ib = i * n_spsym
for j in range(3 * n_spsym):
dphi[j + ib] += dphi_peak * symbols[i] * pulse[j]
for j in range(2 * n_spsym):
dphi[j] += dphi_peak * pulse[j + n_spsym] * symbols[0]
dphi[j + n_sym * n_spsym] += dphi_peak * pulse[j] * symbols[n_sym - 1]
phi = 0.0
n_ramp = n_spsym // 8
for k in range(n_wave):
val = math.sin(phi)
phi = math.fmod(phi + dphi[k + n_spsym], 2 * math.pi)
if k < n_ramp or k >= n_wave - n_ramp:
i_ramp = (k if k < n_ramp else n_wave - k - 1)
env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2
val *= env
yield val
Генераторная функция gfsk_pulse
рассчитывает значения сглаживающего импульса.
Параметр n_spsym
— количество семплов на символ; параметр symbol_bt
— крутизна сглаживания (параметр BT
); результат — генератор со значениями типа float
.
Генераторная функция synth_gfsk
формирует дискретный сигнал, с частотой дискретизации задаваемой параметром signal_rate
, на вход передается список тонов в параметре symbols; параметр n_sym
определяет размер множества символов (значение FT4_NN
/FT8_NN
); параметр f0 определяет базовую частоту, относительно которой будет будут рассчитываться частоты тонов; symbol_bt
— крутизна фильтра Гаусса (FT4_SYMBOL_BT
/FT8_SYMBOL_BT
); параметр symbol_period
определяет длительность звучания каждого тона в секундах (FT4_SYMBOL_PERIOD
/FT8_SYMBOL_PERIOD
); функция генерирует последовательность дискретных значений от -1 до 1 используя функцию синус (sin), в качестве аргумента которого передается рассчитанная фаза сигнала.
pulse = list(gfsk_pulse(n_spsym, symbol_bt))
В pulse формируется список дискретных значений сглаживающей функции Гаусса.
При формировании сигнала дополнительно происходит сглаживание амплитуд в начале и конце сигнала оконной функцией Ханна:
if k < n_ramp or k >= n_wave - n_ramp:
i_ramp = (k if k < n_ramp else n_wave - k - 1)
env = (1 - math.cos(2 * math.pi * i_ramp / (2 * n_ramp))) / 2
val *= env
Таким образом, результатом работы функций канального уровня, является дискретный звуковой сигнал, готовый к отправке в ЦАП передатчика.
Общий вид кодера и генератора сигнала FT8/FT4:
import numpy as np
from scipy.io.wavfile import write
FT4_SLOT_TIME = 7.5
FT8_SLOT_TIME = 15.0
def main():
try:
call = "CQ R1ABC KO85"
payload = ftx_message_encode_std(*call.split())
except Exception as e:
print(f"Cannot parse message: {type(e)}")
return
print("Payload", ''.join('{:02x}'.format(x) for x in payload))
is_ft4 = False
if is_ft4:
tones = ft4_encode(payload)
else:
tones = ft8_encode(payload)
tones = list(tones)
print("FSK tones:", "".join(str(i) for i in tones))
frequency = 1000
symbol_period = FT4_SYMBOL_PERIOD if is_ft4 else FT8_SYMBOL_PERIOD
symbol_bt = FT4_SYMBOL_BT if is_ft4 else FT8_SYMBOL_BT
sample_rate = 12000
num_tones = FT4_NN if is_ft4 else FT8_NN
slot_time = FT4_SLOT_TIME if is_ft4 else FT8_SLOT_TIME
num_samples = int(0.5 + num_tones * symbol_period * sample_rate)
num_silence = int((slot_time * sample_rate - num_samples) / 2)
signal = np.fromiter(synth_gfsk(tones, num_tones, frequency, symbol_bt, symbol_period, sample_rate), dtype=float)
silence = np.zeros(num_silence)
amplitude = np.iinfo(np.int16).max
data = np.concat([silence, amplitude * signal, silence])
write("examples/signal.wav", sample_rate, data.astype(np.int16))
if __name__ == '__main__':
main()
Приведенный код формирует из строки «CQ R1ABC KO85
» осуществляет генерацию сигнала протокола FT8 и сохраняет сигнал в файл с именем signal.wav
.
Изменение значения переменной is_ft4 на True изменит логику генератора на формирование сигнала протокола FT4.
Результат вывода данных для FT8:
Payload 00000020587223930748
FSK tones: 3140652000000001006514310711507323733140652354273733240626502442635752603140652
Результат вывода данных для FT4:
Payload 00000020587223930748
FSK tones: 001321033112330313110233022301133210230133231130211212323323311323323103030230303333021312132001031332010
Декодирование сигнала
При декодировании входящий сигнал разделяется на блоки, размером в период символа протокола (для FT8 0.160 и FT4 0.048 сек), суммарной продолжительностью во временной слот протокола (15.0 и 7.5 секунд для FT8 и FT4 соответственно); далее каждый блок передается в оконное преобразование Фурье (WFFT, рисунок 14) с использованием оконной функции (например Ханна, рисунок 15 и формула 2), магнитуды частот преобразуются в амплитуды в децибелах.

Рисунок 14: Представление преобразования Фурье.

Рисунок 15: График оконной функции Ханна.

(2)
Дальнейшая обработка сигнала заключается в поиске сигналов-кандидатов (поиск по массиву Костаса), нормализация амплитуд сигнала кандидата и передача в LDPC-декодер (алгоритм Belief Propagation), то есть вверх по иерархии уровней абстрагирования.
Логику обработки сигнала можно инкапсулировать в класс Monitor:
kMin_score = 5
kMax_candidates = 140
kLDPC_iterations = 25
kMaxLDPCErrors = 32
@dataclass
class Waterfall:
num_bins: int
time_osr: int
freq_osr: int
protocol: int
mag = typing.List[int]
max_blocks: int
num_blocks: int = 0
block_stride: int = 0
def __post_init__(self):
self.block_stride = (self.time_osr * self.freq_osr * self.num_bins)
self.mag = [0] * (self.max_blocks * self.time_osr * self.freq_osr * self.num_bins)
class Monitor:
@staticmethod
def hann_i(i: int, N: int) -> float:
x = math.sin(math.pi * i / N)
return x ** 2
@staticmethod
def ftx_normalize_logl(log174: typing.List[float]) -> typing.Generator[float, None, None]:
...
def __init__(self, f_min: int, f_max: int, sample_rate: int, time_osr: int, freq_osr: int, protocol):
slot_time = FTX_SLOT_TIMES[protocol]
symbol_period = FTX_SYMBOL_PERIODS[protocol]
self.block_size = int(sample_rate * symbol_period)
self.subblock_size = int(self.block_size / time_osr)
self.nfft = self.block_size * freq_osr
self.fft_norm = 2.0 / self.nfft
self.window = [self.fft_norm * self.hann_i(i, self.nfft) for i in range(self.nfft)]
self.last_frame = [0.0] * self.nfft
max_blocks = int(slot_time / symbol_period)
self.min_bin = int(f_min * symbol_period)
self.max_bin = int(f_max * symbol_period + 1)
num_bins = self.max_bin - self.min_bin
self.wf = Waterfall(max_blocks=max_blocks, num_bins=num_bins, time_osr=time_osr, freq_osr=freq_osr,
protocol=protocol)
self.symbol_period = symbol_period
self.max_mag = -120.0
def monitor_process(self, frame: typing.List[float]):
if self.wf.num_blocks >= self.wf.max_blocks:
return False
offset = self.wf.num_blocks * self.wf.block_stride
frame_pos = 0
for time_sub in range(self.wf.time_osr):
for pos in range(self.nfft - self.subblock_size):
self.last_frame[pos] = self.last_frame[pos + self.subblock_size]
for pos in range(self.nfft - self.subblock_size, self.nfft):
self.last_frame[pos] = frame[frame_pos]
frame_pos += 1
timedata = [self.window[pos] * self.last_frame[pos] for pos in range(self.nfft)]
freqdata = np.fft.fft(timedata)[:self.nfft // 2 + 1]
for freq_sub in range(self.wf.freq_osr):
for bin in range(self.min_bin, self.max_bin):
src_bin = (bin * self.wf.freq_osr) + freq_sub
mag2 = freqdata[src_bin].imag ** 2 + freqdata[src_bin].real ** 2
db = 10.0 * math.log10(1E-12 + mag2)
scaled = int(2 * db + 240)
self.wf.mag[offset] = max(min(scaled, 255), 0)
offset += 1
self.max_mag = max(self.max_mag, db)
self.wf.num_blocks += 1
return True
def ft8_sync_score(self, candidate: Candidate) -> int:
...
def ft4_sync_score(self, candidate: Candidate) -> int:
...
def ftx_sync_score(self, candidate: Candidate) -> int:
...
def ftx_find_candidates(self, num_candidates: int, min_score: int) -> typing.List[Candidate]:
...
def get_cand_mag_idx(self, candidate: Candidate) -> int:
...
def ft4_extract_likelihood(self, cand: Candidate) -> typing.List[float]:
log174 = [0.0] * FTX_LDPC_N
mag = self.get_cand_mag_idx(cand)
for k in range(FT4_ND):
sym_idx = k + (5 if k < 29 else 9 if k < 58 else 13)
bit_idx = 2 * k
block = cand.time_offset + sym_idx
if block < 0 or block >= self.wf.num_blocks:
log174[bit_idx + 0] = 0
log174[bit_idx + 1] = 0
else:
logl_0, logl_1 = self.ft4_extract_symbol(mag + sym_idx * self.wf.block_stride)
log174[bit_idx + 0] = logl_0
log174[bit_idx + 1] = logl_1
return log174
def ft8_extract_likelihood(self, cand: Candidate) -> typing.List[float]:
log174 = [0.0] * FTX_LDPC_N
mag = self.get_cand_mag_idx(cand)
for k in range(FT8_ND):
sym_idx = k + (7 if k < 29 else 14)
bit_idx = 3 * k
block = cand.time_offset + sym_idx
if block < 0 or block >= self.wf.num_blocks:
log174[bit_idx + 0] = 0
log174[bit_idx + 1] = 0
log174[bit_idx + 2] = 0
else:
logl_0, logl_1, logl_2 = self.ft8_extract_symbol(mag + sym_idx * self.wf.block_stride)
log174[bit_idx + 0] = logl_0
log174[bit_idx + 1] = logl_1
log174[bit_idx + 2] = logl_2
return log174
def ft4_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float]:
...
def ft8_extract_symbol(self, mag_idx: int) -> typing.Tuple[float, float, float]:
...
def ftx_decode_candidate(
self, cand: Candidate,
max_iterations: int) -> typing.Optional[typing.Tuple[DecodeStatus, typing.Optional[bytes], float]]:
wf = self.wf
if wf.protocol == FTX_PROTOCOL_FT4:
log174 = self.ft4_extract_likelihood(cand)
else:
log174 = self.ft8_extract_likelihood(cand)
log174 = list(self.ftx_normalize_logl(log174))
ldpc_errors, plain174 = bp_decode(log174, max_iterations)
if ldpc_errors > kMaxLDPCErrors:
return None
if not ftx_check_crc(plain174):
return None
a91 = self.pack_bits(plain174, FTX_LDPC_K)
crc_extracted = ftx_extract_crc(a91)
if wf.protocol == FTX_PROTOCOL_FT4:
payload = bytearray(a91[i] ^ xor for i, xor in enumerate(FT4_XOR_SEQUENCE))
tones = ft4_encode(payload)
else:
payload = a91
tones = ft8_encode(payload)
snr = self.ftx_subtract(cand, tones)
return DecodeStatus(ldpc_errors, crc_extracted), payload, snr
def decode(self) -> typing.Generator[typing.Tuple[float, float, float, str], None, None]:
hashes = set()
wf = self.wf
candidate_list = self.ftx_find_candidates(kMax_candidates, kMin_score)
for cand in candidate_list:
freq_hz = (self.min_bin + cand.freq_offset + cand.freq_sub / wf.freq_osr) / self.symbol_period
time_sec = (cand.time_offset + cand.time_sub / wf.time_osr) * self.symbol_period - 0.65
if not (x := self.ftx_decode_candidate(cand, kLDPC_iterations)):
continue
status, message, snr = x
if (crc := status.crc_extracted) in hashes:
continue
hashes.add(crc)
call_to_rx, call_de_rx, extra_rx = ftx_message_decode(message)
yield snr, time_sec, freq_hz, " ".join([call_to_rx, call_de_rx or "", extra_rx or ""])
Генераторная функция decode
инкапсулирует логику подбора кандидатов и их декодирование ранее описанными функциями.
Функция выдает кортеж с данными о SNR, отклонении времени и декодированные сообщения в текстовом виде.
Функция ftx_decode_candidate
принимает на вход сигнал-кандидат, вызывает методы ft4_extract_likelihood
/ft8_extract_likelihood
; данные извлеченные из сигнала, нормализованные функцией ftx_normalize_logl
, передаются в LPDC-декодер (bp_decode
), после чего проверяется CRC-14 (ftx_check_crc
) и на основе декодированных данных декодируется сообщение.
Функции ft8_extract_likelihood
и ft4_extract_likelihood
извлекают биты данны из общего скопа сигналов, формируя список «аналоговых» бит.
Функция monitor_process
реализует аккумулятор сигнала, принимая на вход блоки принятого дискретного сигнала, производя над ними преобразование Фурье и сохраняет результаты в «водопад» (структура Waterfall
).
Параметрами конструктора класса Monitor являются ширина полосы частот принимаемого сигнала (f_min
, f_max
), частота дискретизации (sample_rate
), параметры оверсемплинга (time_osr
, freq_osr
) и идентификатор протокола FT8/FT4.
Полный пример декодера с использованием класса Monitor:
import time
import numpy as np
from scipy.io.wavfile import read
from consts import FTX_PROTOCOL_FT8, FTX_PROTOCOL_FT4
from decode import Monitor
kFreq_osr = 2
kTime_osr = 4
def main():
is_ft4 = False
sample_rate, data = read("examples/signal.wav")
amplitude = np.iinfo(data.dtype).max
signal = data / amplitude
protocol = FTX_PROTOCOL_FT4 if is_ft4 else FTX_PROTOCOL_FT8
mon = Monitor(
f_min=200,
f_max=3000,
sample_rate=sample_rate,
time_osr=kTime_osr,
freq_osr=kFreq_osr,
protocol=protocol
)
frame_pos = 0
while True:
eof = frame_pos >= len(signal) - mon.block_size
if eof or not mon.monitor_process(signal[frame_pos:frame_pos + mon.block_size]):
print(f"Waterfall accumulated {mon.wf.num_blocks} symbols")
print(f"Max magnitude: {mon.max_mag:+.2f} dB")
tm_slot_start = 0
ts1 = time.monotonic()
for i, (snr, time_sec, freq_hz, text) in enumerate(mon.decode(tm_slot_start)):
print(
f"{i + 1:03}\t"
f"{snr:+06.2f}dB\t"
f"{time_sec:-.2f}sec\t"
f"{freq_hz:.2f}Hz\t"
f"{text}"
)
mon.wf.num_blocks = 0
mon.max_mag = -120.0
ts2 = time.monotonic()
print("-" * 20, "decoded @", ts2 - ts1, "sec")
if eof:
break
frame_pos += mon.block_size
if __name__ == '__main__':
main()
На рисунках 16 и 17 приведены спектрограмма формируемых сигналов для FT8 и FT4.

Рисунок 16: Спектрограмма сигнала FT8.

Рисунок 17: Спектрограмма сигнала FT4.
Расчет SNR
Один из способов вычисления SNR — определить амплитуды сигнала, исключить из полосы сигнала сам сигнал и просуммировать амплитуды в его окрестностях.
from copy import copy
from itertools import cycle
def ftx_subtract(self, candidate: Candidate, tones: typing.Iterable[int]) -> float:
num_tones = FTX_TONES_COUNT[self.wf.protocol]
can = copy(candidate)
snr_all = 0.0
tones = cycle(tones)
for freq_sub in range(self.wf.freq_osr):
can.freq_sub = freq_sub
mag_cand = self.get_cand_mag_idx(can)
noise = 0.0
signal = 0.0
num_average = 0
for i, tone in enumerate(tones):
block_abs = candidate.time_offset + i
if block_abs < 0:
continue
if block_abs >= self.wf.num_blocks:
break
wf_el = mag_cand + i * self.wf.block_stride
noise_val = 100000.0
for s in filter(lambda x: x != tone, range(num_tones)):
noise_val = min(noise_val, self.wf.mag[wf_el + s] * 0.5 - 120.0)
noise += noise_val
signal += self.wf.mag[wf_el + tone] * 0.5 - 120.0
num_average += 1
noise /= num_average
signal /= num_average
snr = signal - noise
for i, tone in enumerate(tones):
block_abs = candidate.time_offset + i
if block_abs < 0:
continue
if block_abs >= self.wf.num_blocks:
break
wf_el = mag_cand + i * self.wf.block_stride
self.wf.mag[wf_el + tone] -= snr * 2 + 240
snr_all += snr
return snr_all / self.wf.freq_osr / 2 - 22
Функция ftx_subtract
(метод класса Monitor
) перебирает тона исходного сигнала, вычисляет их амплитуды и амплитуды полосы сигнала, исключая амплитуды самих тонов. Функция возвращает результат деления уровня сигнала на уровень шумов. Побочным эффектом функции является удаление сигнала из водопада, что влияет на чувствительность декодирования соседних слабых сигналов.
Заключение
В статье были рассмотрены протоколы FT8 и FT4, их внутреннее устройство и принципы работы. Так, используя относительно простые и давно известные методы кодирования, реализуется достаточно помехоустойчивый протокол, позволяющий устанавливать дальние радиолюбительские связи. Также протоколы представляют интерес тем, что воплощают в себе принципы из таких областей как линейное кодирование и коррекция ошибок, радиолокация и цифровая обработка сигналов, частотная манипуляция, что также может являться предметом углубленного изучения.
Один из недостатков протоколов, подобных FT8/FT4, является тот факт, что радиообмен осуществляется в автоматическом режиме (в режиме модема), а полезными данными является только обмен метаданными в виде позывных, геолокации и уровнем SNR, хотя при этом протокол поддерживает обмен произвольными сообщениями очень малой длины и учитывая скорость обмена, работа в режиме чата достаточно затруднительная.
Так как архитектура протоколов представляет собой «матрешку» или «луковицу», технология может быть использована как базис для радиолюбительских экспериментов с передачей информации на большие расстояния.
Ссылки
Комментарии (10)
NutsUnderline
15.07.2025 17:54сразу захотелось посмотреть какая кодовая база для всего этого была в оригинале
vesowoma
Не понял этого момента. Радиолюбители обычно используют диапазоны сильно выше 20 кГц, что могут слышать люди, и в приемниках происходит преобразование передаваемого сигнала в аудиосигнал или визуальную информацию (как например SSTV)
Vindicar
Я полагаю, имеется ввиду, что данные, переданные по этому протоколу можно восстановить даже при таком соотношении сигнал-шум, при котором простая речь уже будет неразличима.
bashkirtsevich Автор
Всё верно. В контрпример еще приводят телеграф, его человек еще может различить там, где неразличима речь; протоколы FT8/FT4 обрабатываются программно, что позволяет превзойти чувствительность человека.
bashkirtsevich Автор
Здесь имеется ввиду порог слышимости в децибелах, когда сигнал настолько слабый, что человеческое ухо не может отличить его от фонового (эфирного) шума. Сами сигналы FT8/FT4 передаются на слышимых человеком частотах, то есть можно настроить приемник, например, на 14.074 МГц USB и услышать их.
vesowoma
Спасибо, теперь понятно. Просто термин "порог слышимости" для протокола, где обработка ведется не ушами и мозгом, а при помощи технических средств, ввел в заблуждение.
Astroscope
А если прислонить к динамику приемника микрофон телефона с программой-декодером, то еще и увидеть, что именно передается. Для не-радиолюбителей этот обмен едва ли интересен, потому что, как вполне очевидно описано в статье, передаваемая информация состоит из позывных, примерных координат (локаторов), отчетов о силе (скорее об SNR) принимаемых сигналов, и из служебных флагов подтверждения обмена - традиционных для связистов "RRR" (собственно подтверждение) и "73" ("наилучшие пожелания", почти всегда передается в конце сеанса связи и поэтому дилетантами ошибочно понимается как "до свидания").