
WSPR — цифровой протокол, разработанный Джо Тейлором (K1JT) в 2008-2009 годах, с целью исследования распространения радиосигналов от коротковолновых передатчиков малой и сверхмалой мощности. В предыдущей части были рассмотрены механизмы помехозащищенного кодирования данных и формирования сигнала для отправки его в эфир. В этой части статьи рассматриваются механизмы детектирования и декодирования принимаемого из сигнала.
Статья может быть интересна радиолюбителям, как знакомым, так и не знакомым с WSPR, а также тем, кто интересуется темой цифровой обработки сигналов и хочет понять устройство этого протокола.
Общее представление
В первой части были рассмотрены принципы и алгоритмы, задействованные в процессе формировании сигнала.

Процесс детекции и декодирования по своей сути диаметрально противоположен процессу кодирования. Обрабатываемый сигнал предварительно подвергается фильтрации даунсемплингу и фильтрации в частотной области, после чего осуществляется процесс синхронизации сигнала во временной области (для чего используются согласованные фильтры), что позволяет извлечь символы данных; извлеченные символы затем подвергаются декодированию с коррекцией ошибок, что уже позволяет получить на выходе передаваемое WSPR-маяком сообщение.
Перенос спектра и фильтрация
Входящий звуковой сигнал для обработки в WSPR поступает с частотой дискретизации 12.0 кГц, при этом центральная частота сигнала располагается в зоне 1.5 кГц; так как сигнал накапливается в течении 110 секунд, формируется достаточно большой объем звуковых данных, из которых только нужна полоса шириной self.sample_rate / WSPR_DECIMATION = 12000 / 32 = 375 Гц, всё что за пределами этой полосы — бесполезный шум.
Даунсемплинг и децимация сигнала в WSPR осуществляются в частотной области через прямое и обратное преобразование Фурье.
WSPR_DECIMATION = 32 WSPR_CENTER_FREQ = 1500 class WSPRMonitor(AbstractMonitor): __slots__ = [ "sample_rate", "signal", ... ] def __init__(self, sample_rate: int): self.signal = np.zeros(0, dtype=np.float64) self.sample_rate = sample_rate ... def downsample(self) -> npt.NDArray[np.complex128]: fft_target = 46080 fft_src = fft_target * WSPR_DECIMATION frame = np.pad(self.signal[:fft_src], (0, max(0, fft_src - len(self.signal)))) spec = np.fft.rfft(frame) df = self.sample_rate / fft_src i0 = int(WSPR_CENTER_FREQ / df + 0.5) sub_spec = spec[i0: i0 + fft_target] signal = np.fft.ifft(sub_spec) * (fft_target / fft_src) * 2 return signal.astype(np.complex128) ... def monitor_process(self, frame: npt.NDArray[np.int16]) -> None: frame_float = frame.astype(np.float32) / np.iinfo(np.int16).max self.signal = np.concat([self.signal, frame_float]) ...
Поле sample_rate хранит информацию о частоте дискретизации исходного сигнала, в signal накапливаются данные самого сигнала.
Функция downsample осуществляет перенос спектра из области высоких частот в область низких, выполняя децимацию сигнала.
В переменную spec записывается результат преобразования Фурье для вещественной части сигнала.
Примечание: то есть в спектре содержатся только положительные частоты, область с отрицательными частотами зеркально симметрична и не используется (рисунок 2).

Переменная df определяет целевой частотный размер Фурье-бинов после преобразования.
Переменная i0 задает параметры фильтра в частотной области, — индекс частотного бина в исходном спектре, относительно которого формируется полоса пропускания; полоса пропускания ограничивается значением fft_target.
Массив данных в sub_spec — перенесенный и отфильтрованный по частоте спектр сигна. Переход из частотной во временную область осуществляется обратным преобразованием Фурье; сигнал дополнительно подвергается нормализации амплитуд (AGC).
При использовании обратного преобразования Фурье, полученный сигнал формируется в виде массива комплексных чисел, — то есть представляет собой сигнал в I/Q форме (что эквивалентно записи Эйлера в формате комплексной экспоненты
).
Таким образом, перенос спектра и даунсемплинг позволяет осуществить децимацию без потерь, использовать меньшую частоту дискретизации, что значительно уменьшает объем данных, обработка которых потребует гораздо меньшего количества ресурсов и упрощает процесс анализа сигнала; также в частотной области происходит исключение высокочастотных и внеполосных шумов, что положительно влияет на SNR.
Детекция сигнала
Процесс декодирования сигнала начинается с его детекции. Функция decode получает I/Q сигнал после даунсемплинга.
class WSPRMonitor(AbstractMonitor): __slots__ = [ ... "symbol_len", "df", "df2", "dt", "decode_passes", ... ] ... def __init__(self, sample_rate: int): ... down_sample_rate = self.sample_rate / WSPR_DECIMATION self.symbol_len = int(down_sample_rate * WSPR_SYMBOL_PERIOD) self.df = down_sample_rate / self.symbol_len self.df2 = self.df / 2 self.dt = 1 / down_sample_rate self.decode_passes = 3 ... def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: iq_signal = self.downsample() symbols = np.zeros(WSPR_NUM_BITS * 2, dtype=np.uint8) fft_step = 128 fft_size = fft_step * 4 fft_count = 4 * (len(iq_signal) // fft_size) - 1 smooth = np.sin((np.pi / fft_size) * np.arange(fft_size)) pwr_spec = np.zeros((fft_count, fft_size)) block_size = 1 max_drift = 4 min_sync_2 = WSPR_MIN_SYNC_2 decodes_pass = 0 ...
Определяются параметры построения спектра I/Q сигнала — fft_step, fft_size, fft_count. Подготавливается сглаживающая функция для оконного преобразования Фурье.
Примечание: вместо окна Ханна Стив Франки (K9AN) применил положительный полупериод функции синус. В отличие от функции Ханна, функция синус чуть хуже подавляет боковые лепестки в спектре.
WSPR_MIN_SYNC_1 = 0.10 WSPR_MIN_SYNC_2 = 0.12 ...
Константы WSPR_MIN_SYNC_1, WSPR_MIN_SYNC_2 определяют параметры минимального отклонения частоты при синхронизации.
Основной цикл обработки сигнала:
... for iteration in range(self.decode_passes): if iteration == 1 and decodes_pass == 0 and self.decode_passes > 2: iteration = 2 if iteration == 2: block_size = 4 max_drift = 0 min_sync_2 = WSPR_MIN_SYNC_1 decodes_pass = 0 ...
Усреднение спектра
В WSPR принимаемые сигналы могут иметь очень низкий показатель SNR, что делает их практически невидимыми. В WSPR для выделения слабых сигналов применяется алгоритм усреднения спектра накопленных данных. Так как шум в большинстве случаев является случайным процессом, с нулевым мат. ожиданием и не коррелирует друг с другом, а полезный сигнал находится на неизменной частоте и имеет неразрывную фазу, то при сложении спектров мощность сигнала будет расти линейно, в такое число раз, кратное количеству усреднений, в то время как спектр шума будет расти по закону случайных блужданий, — пропорционально квадратному корню из количества усреднений ().
На рисунке 3 приведен пример усреднения спектра сигнала в канале с аддитивным Гауссовским шумом при соотношении сигнал/шум SNR=-30 дБ. На первом графике длительность сигнала частотой 1.0 кГц (положение в спектре выделено красным пунктиром) составляет 1с, на второ — 10с и 30с на третьем соответственно. Как видно из графиков, при десятикратном усреднении спектра, полезный сигнал начинает проявляться относительно шума, то есть SNR начинает расти. При еще более долгом накоплении спектра сигнал начинает еще сильнее преобладать над шумом.

... for i in range(fft_count): start = i * fft_step segment = iq_signal[start: start + fft_size] if len(segment) < fft_size: segment = np.pad(segment, (0, fft_size - len(segment)), mode="constant") cpx_spec = np.fft.fft(segment * smooth) cpx_spec = np.fft.fftshift(cpx_spec) pwr_spec[i, :] = np.abs(cpx_spec) ** 2 pwr_spec_avg = np.mean(pwr_spec, axis=0) * fft_count center = fft_size // 2 span = 205 relevant_part = pwr_spec_avg[center - span: center + span + 1] smooth_spec = np.convolve(relevant_part, WSPR_SMOOTH_KERNEL, mode="same") noise_level = np.percentile(smooth_spec, WSPR_NOISE_PERCENTILE) min_snr = np.pow(10.0, -8.0 / 10.0) smooth_spec = smooth_spec / noise_level - 1.0 smooth_spec[smooth_spec < min_snr] = 0.1 * min_snr ...
Исходя из того условия, что длительность одного символа WSPR составляет 0.6827 с, демодулятору, используя усреднение спектра, удается извлекать сигналы с низким значением SNR.
Поиск кандидатов
Следующий этап после усреднения спектра — анализ амплитуд в полученном спектре, заключающийся в нахождении локальных максимумов спектра. Каждый найденный локальный максимум является признаком потенциального искомого сигнала.
WSPR_SNR_SCALING_FACTOR = 26.3 ... def dB(x: float) -> float: val = -99.0 if x > 1.259e-10: x = max(x, 0.000001) val = 10.0 * np.log10(x) return val ... @dataclass class Candidate: freq: float snr: float shift: int = 0 drift: float = 0 sync: float = 0 ... class WSPRMonitor(AbstractMonitor): ... MAX_CANDIDATES = 410 CANDIDATE_FREQ_MIN = -110 CANDIDATE_FREQ_MAX = 110 ... def find_candidates(self, smooth_spec: npt.NDArray[np.float64]) -> typing.List[Candidate]: spec_slice = smooth_spec[:self.MAX_CANDIDATES] is_max = (spec_slice[1:-1] > spec_slice[:-2]) & (spec_slice[1:-1] > spec_slice[2:]) indices = np.where(is_max)[0] + 1 freqs = (indices - 205) * self.df2 mask = (freqs >= self.CANDIDATE_FREQ_MIN) & (freqs <= self.CANDIDATE_FREQ_MAX) cand_indices = indices[mask] cand_freqs = freqs[mask] snrs = [dB(ss) - WSPR_SNR_SCALING_FACTOR for ss in smooth_spec[cand_indices]] heap = [Candidate(freq=freq, snr=snr) for freq, snr in zip(cand_freqs, snrs)] heap.sort(key=lambda it: it.snr, reverse=True) return heap
Функция find_candidates анализирует данные в спектре smooth_spec, находит в нем частотные бины, удовлетворяющие правилу, что соседние слева и справа от анализируемого бины имеют меньшее значение амплитуды. Массив is_max содержит логические значения, определяющие какой из бинов удовлетворяет условию. В массивы indices и freqs накапливаются индексы и значения частот бинов-кандидатов.
Для cand_indices и cand_freqs осуществляется фильтрация, исключающая кандидатов, частоты которых сильно отклоняются от центральной; на основе данных из cand_indices cand_freqs формируется список объектов для описания кандидата для последующего декодирования. Сформированный список отсортирован по возрастанию SNR.
Так, для тестового WSPR сигнала, сформированного в рамках предыдущей части, функция find_candidates выдаст результат:
[Candidate(freq=np.float64(2.197265625), snr=np.float64(43.2654156005124), shift=0, drift=0, sync=0)].
Так как анализируемый сигнал не имеет шумов, значение SNR принимает достаточно высокое значение.
Синхронизация
После нахождения частот кандидатов осуществляется синхронизация. Для каждого кандидата выполняется грубый поиск по частоте и времени кросс-корреляции с вектором синхронизации WSPR_PR3_SIG.
Процесс синхронизации разделен на 2 стадии: грубая и точная синхронизация.
Грубая синхронизация
class WSPRMonitor(AbstractMonitor): ... def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: ... candidates = self.find_candidates(smooth_spec) for cand in candidates: smax = -1e30 if0 = int(cand.freq / self.df2 + self.symbol_len) for ifr in range(if0 - 2, if0 + 2 + 1): for k0 in range(-10, 22): for drift in range(-max_drift, max_drift + 1): ss = 0.0 pow = 0.0 for k in range(WSPR_ND): ifd = int(ifr + (k - WSPR_NUM_BITS) / WSPR_NUM_BITS * drift / (2.0 * self.df2)) k_idx = k0 + 2 * k if k_idx >= 0 and k_idx < fft_count: pwrs = pwr_spec[k_idx, ifd - 3: ifd + 4: 2] pwrs_sqrt = np.sqrt(pwrs) ss += WSPR_PR3_SIG[k] * pwrs_sqrt.dot([-1, 1, -1, 1]) pow += np.sum(pwrs_sqrt) sync1 = ss / pow if sync1 > smax: smax = sync1 cand.shift = 128 * (k0 + 1) cand.drift = drift cand.freq = (ifr - self.symbol_len) * self.df2 cand.sync = sync1 ...
Получив список кандидатов, осуществляется грубый поиск сигнала по частоте (цикл ifr), времени (k0) и дрейфу (drift) относительно корреляции с вектором синхронизации WSPR_PR3_SIG. Перебор частот осуществляется в диапазоне от -2 до +2 Гц относительно центральной; диапазон перебора времени ограничен в пределах 8 символов, что составляет примерно 5.4 сек; дрейф (плавное смещение частоты тонов на всем протяжении сигнала, возникновение которого может происходить вследствие изменения свойств генератора опорной частоты передатчика во время работы, при нагреве) в пределах от -4 до +4 Гц.
Вычисление корреляции осуществляется скалярным произведением вектора синхронизации и спектра pwr_spec (ss += WSPR_PR3_SIG[k] * pwrs_sqrt.dot([-1, 1, -1, 1])) по схеме (b+d)-(a+c). Для нормализации значения корреляции значение делится на общую мощность pwrs_sqrt.
Наибольшее значение корреляции считается самым лучшим совпадением, при достижении которого происходит запись параметров shift, drift, freq и sync.
Кандидаты, у которых одинаковые значения частот и лагов, считаются одинаковыми, дубликаты удаляются.
... cands = 0 for j in range(len(candidates)): dupe = False for k in range(cands): if abs(candidates[j].freq - candidates[k].freq) < 0.05 and abs( candidates[j].shift - candidates[k].shift) < 16: dupe = True break if dupe: if candidates[j].sync > candidates[k].sync: candidates[k] = candidates[j] elif candidates[j].sync > min_sync_2: candidates[cands] = candidates[j] cands += 1 ...
Точная синхронизация
После грубой оценки точности сигнала по корреляции с вектором синхронизации осуществляется процесс точной синхронизации с сигналом.
Для каждого из кандидатов поэтапно происходит уточнение времени, после чего уточняется частота.
... WSPR_MIN_SYNC_1 = 0.10 WSPR_MIN_SYNC_2 = 0.12 WSPR_ND = 162 ...
... for cand in candidates: f1 = cand.freq drift1 = cand.drift shift1 = cand.shift f_step = 0.0 f_min = 0 f_max = 0 lag_min = shift1 - 128 lag_max = shift1 + 128 lag_step = 64 sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, drift1, 0) f_step = 0.25 f_min = -2 f_max = 2 sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, drift1, 1) if iteration < 2: f_step = 0.0 f_min = 0 f_max = 0 driftp = drift1 + 0.5 syncp, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, driftp, 1) driftm = drift1 - 0.5 syncm, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, driftm, 1) if syncp > sync1: drift1 = driftp sync1 = syncp elif syncm > sync1: drift1 = driftm sync1 = syncm if sync1 > WSPR_MIN_SYNC_1: lag_min = shift1 - 32 lag_max = shift1 + 32 lag_step = 16 sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, drift1, 0) f_step = 0.05 f_min = -2 f_max = 2 sync1, shift1, f1 = self.sync(iq_signal, f1, f_min, f_max, f_step, shift1, lag_min, lag_max, lag_step, drift1, 1) cand.freq = f1 cand.shift = shift1 cand.drift = drift1 cand.sync = sync1 ...
Относительно начального смещения cand.shift происходит уточнение (lag_step) в диапазоне от -128 до 128 семплов с шагами в 64.
После нахождения максимального совпадения по времени осуществляется уточнение частоты сигнала (f_step) с шагами 0.25 в диапазоне -2 +2 Гц относительно ранее зафиксированной частоты cand.freq.
Уточнение дрейфа производится в диапазоне от -0.5 до +0.5 Гц.
После уточнения значения sync1 сравнивается с пороговым значением WSPR_MIN_SYNC_1, при превышении которого производится еще один этап тонкой подстройки синхронизации, в пределах от -32 до 32 семплов с шагом 16 семплов для подстройки по времени и от -2 до +2 Гц с шагом 0.05 Гц для частоты.
Точная синхронизация сигнала осуществляется с помощью функции sync, реализующего частный случай согласованного фильтра.
WSPR_TONES_COUNT = 4 ... class WSPRMonitor(AbstractMonitor): ... def sync( self, iq_signal: npt.NDArray[np.complex128], f1: float, f_min: int, f_max: int, f_step: float, shift1: int, lag_min: int, lag_max: int, lag_step: int, drift1: float, mode: int ) -> typing.Tuple[float, int, float]: df_offsets = np.array([-1.5, -0.5, 0.5, 1.5]) * self.df two_pi_dt = 2 * np.pi * self.dt sync_max = -1e30 best_shift = shift1 f_best = f1 if mode == 0: f_min, f_max, f_step = 0, 0, 0.0 if mode == 1: lag_min, lag_max = shift1, shift1 tone_pwr = np.zeros(WSPR_TONES_COUNT, dtype=np.float64) for freq in range(f_min, f_max + 1): f0 = f1 + freq * f_step for lag in range(lag_min, lag_max + lag_step, lag_step): ss = 0.0 total_pwr = 0.0 for sym in range(WSPR_ND): fp = f0 + (drift1 / 2.0) * (sym - WSPR_NUM_BITS) / WSPR_NUM_BITS tone_pwr.fill(0) freqs = fp + df_offsets t_indices = np.arange(self.symbol_len) phasors = np.exp(-1j * two_pi_dt * np.outer(freqs, t_indices)) for tone in range(WSPR_TONES_COUNT): start = lag + sym * self.symbol_len end = start + self.symbol_len segment = iq_signal[max(0, start): min(len(iq_signal), end)] current_phasors = phasors[tone, :len(segment)] acc = np.dot(current_phasors, segment) tone_pwr[tone] = np.abs(acc) total_pwr += np.sum(tone_pwr) channel_metric = np.sum(tone_pwr[1::2]) - np.sum(tone_pwr[0::2]) ss += channel_metric * WSPR_PR3_SIG[sym] if total_pwr > 0: ss /= total_pwr if ss > sync_max: sync_max = ss best_shift = lag f_best = f0 return (sync_max, best_shift, f_best)
Функция sync осуществляет перебор частот и временных лагов; для каждого из 162 символов (WSPR_ND), ожидаемых в передаче, формируется массив комплексных амплитуд phasors, являющимися эталонными (опорными) значениями FSK-тонов. Значение комплексных амплитуд (рисунок 4) — фазоров — формируются для вычисления кросс-корреляции с анализируемым сигналом в I/Q-представлении.
Далее, для каждого из 4 (WSPR_TONES_COUNT) тонов происходит вычисление корреляции принятого сигнала из segment с эталонным из фазора. Значения частот рассчитываются относительно смещения центроальной частоты на величину из df_offsets. В массив total_pwr накапливается суммарная мощность сигнала, которая далее нормируется в ss. Значение ss определяет суммарный отклик согласованного фильтра.

Результат функции — тройка значений: значение максимального отклика (sync_max) и параметры частоты (f_best) и времени (best_shift), на которых этот отклик был достигнут.
Аргумент mode определяет тип перебора параметров, 0 — только по частоте, 1 — только по времени.
Некогерентный детектор
После точной синхронизации с сигналом в полосе, в WSPR осуществляется извлечение символов. Детектор назван некогерентным, так как фаза сигнала заранее неизвестна.
Функция noncoherent_sequence_detection реализует детектор сигнала. На вход поступает I/Q-сигнал (iq_signal), значение центральной частоты сигнала (f1), параметры лага (shift1) и частотного дрейфа (drift1).
class WSPRMonitor(AbstractMonitor): ... def noncoherent_sequence_detection( self, iq_signal: npt.NDArray[np.complex128], symbols: npt.NDArray[np.uint8], f1: float, shift1: int, drift1: float, sym_fac: int, block_size: int, bit_metric: bool ): df15 = self.df * 1.5 df05 = self.df * 0.5 ref_tones = np.zeros((WSPR_TONES_COUNT, WSPR_ND), dtype=np.complex128) phasor_ends = np.zeros((WSPR_TONES_COUNT, WSPR_ND), dtype=np.complex128) symb = np.zeros(WSPR_ND, dtype=np.float64) two_pi_dt = 2 * np.pi * self.dt f0 = f1 fp = f0 + (drift1 / 2.0) * (np.arange(WSPR_ND) - WSPR_NUM_BITS) / WSPR_NUM_BITS d_phi = two_pi_dt * np.array([fp - df15, fp - df05, fp + df05, fp + df15]) phases = d_phi[:, :, np.newaxis] * np.arange(self.symbol_len + 1) phasors = np.exp(1j * phases) ...
Выделение тонов из общего спектра осуществляется через построение согласованного фильтра, который, в данном случае, описывается 3-х мерной матрицей phases, где для каждого их тонов формируются эталонные (опорные) сигналы на всей продолжительности цикла передачи; т.е. (4, 162, 257), где 257 — количество сэмплов на 1 тон.
... lag = int(shift1) indices = lag + np.arange(WSPR_ND)[:, np.newaxis] * self.symbol_len + np.arange(self.symbol_len) indices = np.clip(indices, 0, len(iq_signal) - 1) segments = iq_signal[indices] segments[indices >= len(iq_signal)].fill(0) ref_tones[:, :WSPR_ND] += np.einsum('tfj,fj->tf', np.conj(phasors[:, :, :self.symbol_len]), segments) ...
Отклик фильтра вычисляется через суммирование матриц эталонов с сигналом. В данном случае используется многомерное суммирование через нотацию Эйнштейна — einsum, (запись tfj,fj->tf определяет умножение многомерных матриц и по каким осям оставить сумму результата. в данном случае t — ось тонов, f — ось частот, j — комплексные амплитуды сигналов) это позволяет получить результат для всех тонов сразу, в одно действие. Так как при этом осуществляется умножения на комплексно сопряженные значения (->
), то это можно назвать процессом гетеродинирования сигнала.
Примечание: использование многомерных матриц и их суммирование в python позволяет оптимизировать вычисления, т.к. numpy осуществляет эти операции через нативный код.
После этапа гетеродинирования тонов происходит формирование вероятностей гипотез о том какие символы находятся в сигнале.
... phasor_ends[:, :WSPR_ND] = phasors[:, :, -1] seq = 1 << block_size seq_weights = np.zeros(seq, dtype=np.float64) bit_mask = np.arange(seq) bit_exp = np.arange(block_size - 1, -1, -1) phases = np.ones(block_size, dtype=np.complex128) for i in range(0, WSPR_ND, block_size): time_block = np.arange(i, i + block_size) pr3_block = WSPR_PR3[i: i + block_size] ...
Исходный сигнал разделяется на блоки, продолжительность которых соответствует продолжительности одного символа WSPR, после чего применяет вектор синхронизации WSPR_PR3 к каждому из символов, чтобы локализовать частоты тонов, где должен располагаться символ.
Далее осуществляется перебор гипотез о том какие данные были переданы в символе. Алгоритм реализует метод максимального правдоподобия, перебирая значения бит, соответствующие возможным 4-FSK-символам.
... for j in range(seq): bits = (j >> bit_exp) & 1 tones = pr3_block + 2 * bits phasor_proj = ref_tones[tones, time_block] phasor_shifts = phasor_ends[tones, time_block] phases.fill(1 + 0j) phases[1:] = np.cumprod(phasor_shifts[:-1]) ...
Для каждой гипотезы осуществляется фазовая коррекция (phasor_ends) для когерентного суммирования общего отклика; то есть, если для гипотезы выбран подходящий символ, значение отклика усилится многократно. Коррекция фазы позволяет значительно повысить чувствительность и помехоустойчивость детектора.
... res = np.sum(phasor_proj * phases) seq_weights[j] = np.abs(res) ...
В качестве веса гипотезы используется модуль полученной суммы.
... for ib in range(block_size): mask = (bit_mask & (1 << (block_size - 1 - ib))) == 0 xm0 = seq_weights[:seq][mask].max() xm1 = seq_weights[:seq][~mask].max() metric = xm1 - xm0 if bit_metric: metric /= np.max(xm0, xm1) symb[i + ib] = metric ...
Полученные гипотезы разделяются на группы нулей (xm0) и единиц (xm1), определяется разностная метрика (metric), позволяющая получить численное значение вероятности значения бит.
... std = np.std(symb) symb *= sym_fac / std symbols[:] = np.clip(symb, -128.0, 127.0) + 128
Значения полученных вероятностей через стандартное отклонение нормализуются в диапазон -128..127 (чтобы уместить в однобайтовое целое значение), для последующего декодирования кодом коррекции ошибок.
Аргумент bit_metric функции задает нормализацию метрики, делая ее относительной, позволяя компенсировать эффект изменения амплитуды сигнала (АРУ/AGC).
Для тестовой записи из предыдущей части статьи, детектор символов возвращает результат в такой форме:
184, 71, 86, 182, 176, 167, 87, 181, 70, 85, 90, 88, 184, 70, 183, 88, 184, 87, 70, 177, 86, 70, 86, 184, 70, 71, 185, 175, 167, 87, 89, 185, 87, 177, 70, 173, 86, 69, 87, 184, 89, 90, 85, 169, 182, 86, 70, 174, 185, 71, 178, 178, 70, 70, 85, 70, 86, 174, 179, 185, 71, 86, 70, 86, 173, 178, 179, 185, 71, 87, 184, 87, 70, 179, 70, 86, 70, 179, 178, 183, 87, 88, 69, 85, 70, 185, 177, 87, 175, 185, 71, 87, 184, 87, 70, 179, 176, 88, 184, 88, 174, 181, 87, 172, 185, 179, 86, 70, 175, 177, 185, 185, 185, 179, 185, 71, 173, 88, 185, 71, 178, 185, 167, 87, 87, 70, 70, 70, 172, 87, 181, 89, 88, 184, 179, 70, 87, 176, 185, 183, 88, 90, 85, 166, 174, 177, 178, 70, 70, 85, 70, 178, 70, 70, 87, 86, 170, 71, 185, 179, 178, 91.
Декодирование кандидатов
После синхронизации и получения вероятностей символов начинается этап декодирование данных.
WSPR_SOFT_SYM_FAC = 50 WSPR_MIN_RMS = 52 * (WSPR_SOFT_SYM_FAC / 64) WSPR_IIFAC = 8 ... class WSPRMonitor(AbstractMonitor): ... def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: ... for cand in candidates: f1 = cand.freq shift1 = cand.shift drift1 = cand.drift decoded = None ib = 1 while ib <= block_size and decoded is None: if ib < 4: block_size = ib bit_metric = False else: block_size = 1 bit_metric = True idt = 0 while decoded is None and idt <= (128 / WSPR_IIFAC): ii = (idt + 1) / 2 if idt % 2 == 1: ii = -ii ii = WSPR_IIFAC * ii jittered_shift = shift1 + ii self.noncoherent_sequence_detection( iq_signal, symbols, f1, jittered_shift, drift1, WSPR_SOFT_SYM_FAC, block_size, bit_metric ) ...
Каждый из обнаруженных кандидатов на декодирование, проходит цикл некогерентного детектирования сигнала, при этом дополнительно происходит подбор параметров джиттера для более точного извлечения символов. Перебор параметров осуществляется итеративно, основываясь на результатах декодирования, речь о котором пойдет дальше.
Функция noncoherent_sequence_detection извлекает вероятности символов.
... rms = np.sqrt(np.mean(np.square(symbols.astype(np.float32) - 128))) if rms > WSPR_MIN_RMS: ...
Для полученных символов вычисляется среднеквадратичное отклонение, если значение не превышает пороговое, то итерация подбора параметров и извлечения символов повторяется; в противном случае данные символов подвергаются дальнейшей обработке — деинтерливингу и декодированию.
Деинтерливинг
На этапе передачи, для повышения помехоустойчивости и распределения ошибок символов выполнялся процесс интерливинга, при котором происходила перестановка символов; после извлечения вероятностей символов, на принимающей стороне выполняется обратный процесс — деинтерливинг, задача которого восстановить исходную последовательность символов для извлечения данных.
class WSPRMonitor(AbstractMonitor): ... @staticmethod def deinterleave(sym: npt.NDArray[np.uint8]): tmp = np.zeros(WSPR_ND, dtype=np.uint8) p = 0 i = 0 while p < WSPR_ND: j = (((i * 0x80200802) & 0x0884422110) * 0x0101010101 >> 32) & 0xff if j < WSPR_ND: tmp[p] = sym[j] p = p + 1 i += 1 sym[:] = tmp[:] def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: ... self.deinterleave(symbols) ...
Функция deinterleave реализует деинтерливинг WSPR. В WSPR в угоду скорости применяется разворот бит с использованием двоичных значений и операций над ними (Bit Reverse in 3 operations); константы 0x80200802, 0x0884422110 и 0x0101010101 подобраны так, чтобы выполнять размножение исходных бит на 64 битное слово, применять битовую маску для исключения не интересующих бит и суммирования с получением бит в обратном порядке.
В переменную j вычисляется значение индекса для перестановки, после чего происходит заполнение результирующего массива tmp согласно индексам. Полученный восстановленный массив данных копируется обратно в массив sym.
Восстановленный массив символов подвергается дальнейшему декодированию кодом коррекции ошибок.
Ранее рассмотренные символы данных после деинтерливинга принимают вид:
184, 172, 173, 87, 178, 176, 184, 174, 87, 185, 185, 70, 87, 70, 89, 185, 70, 70, 175, 86, 178, 176, 88, 71, 86, 174, 86, 70, 70, 70, 173, 184, 88, 70, 182, 175, 167, 170, 184, 71, 87, 86, 181, 179, 70, 184, 70, 178, 69, 178, 185, 90, 185, 70, 85, 86, 185, 87, 71, 179, 167, 87, 179, 184, 87, 87, 86, 70, 177, 85, 185, 183, 85, 178, 70, 185, 89, 185, 70, 70, 70, 71, 87, 178, 177, 91, 88, 87, 177, 88, 71, 179, 85, 176, 179, 90, 179, 71, 70, 185, 174, 185, 167, 184, 87, 69, 181, 70, 85, 185, 70, 88, 70, 90, 179, 86, 177, 87, 71, 87, 86, 70, 182, 89, 185, 173, 88, 177, 70, 85, 178, 71, 88, 183, 86, 169, 70, 175, 86, 87, 185, 87, 181, 70, 87, 184, 172, 184, 178, 87, 70, 71, 88, 166, 183, 174, 185, 185, 179, 179, 86, 70.
Декодер Фано
После детекции, извлечения символов и восстановления исходной последовательности с помощью деинтерливинга, данные символов подвергаются процессу декодирования кодами коррекции ошибок.
Класс кодов коррекции ошибок на основе сверточного кода (более подробно рассмотренный в предыдущей части статьи) декодируется такими алгоритмами как: алгоритм Витерби, алгоритм Елинека, алгоритм Фано и т.д..
Каждый из алгоритмов декодирования имеет свои сильные и слабые стороны, так, например, алгоритм Витерби (используемый как стандарт для решения задач декодирования сверточных кодов) перебирает все возможные пути в решетке кода, позволяет получить оптимальный результат за фиксированное время, но имеет экспоненциальный рост алгоритмической сложности при увеличении длины кодового слова. Алгоритм Фано реализует дерево решений (рисунок 5), обладает низким потреблением памяти, может работать с очень большими кодовыми словами, минимизируя вероятности ошибок, но при этом имеет переменное время работы и на зашумленных данных может уйти в бесконечный цикл. Алгоритм Елинека использует для хранения пройденных путей стек с сортировкой по метрике, работает быстрее алгоритма Фано, но требует большого количества памяти и скорость работы алгоритма также сильно зависит от зашумленности данных.
В WSPR используется алгоритм Фано, так как требует минимум ресурсов, хотя при этом, в реализации K9AN присутствует опциональная реализация алгоритма Елинека.

LL_POLY1 = 0xf2d05351 LL_POLY2 = 0xe4613c47
Для декодирования используются те же значениям полиномов, что и при кодировании — Лейланда-Лашбо (Layland-Lushbaugh).
Функция fano реализует алгоритм декодера Фано. Фходные параметры: вероятности символов symbols, количество информационных бит в декодируемом сообщении bits, таблица метрик правдоподобия символов metric_table, шаг изменения порогового значения delta (малое значение обеспечивает высокую точность, но требует большего количества итераций), порог ограничения количества итераций max_iter.
@njit def fano( symbols: npt.NDArray[np.uint8], bits: int, metric_table: npt.NDArray[np.int64], delta: int, max_iter: int, poly1: int = LL_POLY1, poly2: int = LL_POLY2, ) -> typing.Optional[typing.Tuple[int, int, npt.NDArray[np.uint8]]]: s0 = symbols[0::2] s1 = symbols[1::2] metrics_all = np.empty((bits, 4), dtype=np.int64) metrics_all[:, 0] = metric_table[0, s0] + metric_table[0, s1] metrics_all[:, 1] = metric_table[0, s0] + metric_table[1, s1] metrics_all[:, 2] = metric_table[1, s0] + metric_table[0, s1] metrics_all[:, 3] = metric_table[1, s0] + metric_table[1, s1] ...
Для всех пар символов (00, 01, 10, 11, так как скорость алгоритма ½, то есть одному входному биту данных соответствуют два выходных) рассчитываются метрики на основе логарифмических вероятностей из metric_table.
... encstates = np.zeros(bits + 1, dtype=np.uint64) gammas = np.zeros(bits + 1, dtype=np.int64) ...
Массив encstates содержит историю состояний сдвигового регистра, которые, по сути, являются декодированными данными. Массив gammas накапливает метрику пути до текущего узла.
... tms = np.zeros((bits, 2), dtype=np.int64) current_i = np.zeros(bits, dtype=np.uint8) node_id = 0 node_id_max = bits - 1 node_id_tail = bits - 31 lsym = 0 m0 = metrics_all[0, lsym] m1 = metrics_all[0, 3 ^ lsym] if m0 >= m1: tms[0, 0] = m0 tms[0, 1] = m1 else: tms[0, 0] = m1 tms[0, 1] = m0 encstates[0] = 1 threshold = 0 total_iters = max_iter * bits for cycle in range(1, total_iters + 1): current_gamma = gammas[node_id] + (tms[node_id, 0] if current_i[node_id] == 0 else tms[node_id, 1]) ...
Цикл по total_iters реализует движение по дереву декодирования. В каждом узле node_id выбирается наиболее вероятное значение бита 0 или 1 на основе состояния энкодера encstates и значений полиномов poly1, poly2.
... if current_gamma >= threshold: if gammas[node_id] < threshold + delta: if current_gamma >= threshold + delta: threshold += ((current_gamma - threshold) // delta) * delta node_id += 1 gammas[node_id] = current_gamma ...
Пороговое значение threshold зависит от накопления метрики current_gamma, если метрика превышает порог, то процесс декодирования продолжается, но если значение метрики во много раз превышает пороговое значение, то последнее увеличивается на delta.
... if node_id > node_id_max: data = np.zeros(bits // 8, dtype=np.uint8) for j in range(bits // 8): data[j] = encstates[(j + 1) * 8 - 1] & 0xff return current_gamma, cycle, data ...
Функция завершается успешно, если значение node_id достигло значения bits, возвращая при этом значения current_gamma, cycle и значения бит данных data на основе состояния encstates.
... next_state = (encstates[node_id - 1] << 1) encstates[node_id] = next_state tmp = next_state & poly1 tmp ^= tmp >> 16 lsym = PARITY_TAB[(tmp ^ (tmp >> 8)) & 0xff] << 1 tmp = next_state & poly2 tmp ^= tmp >> 16 lsym |= PARITY_TAB[(tmp ^ (tmp >> 8)) & 0xff] ...
Побитовое умножение данных с poly1 и poly2 выделяет биты, участвующие в формировании первого выходного символа. Для получения одного итогового бита выполняется сложение по модулю 2 (xor) между всеми битами tmp.
Для определения четности бит используется xor сдвиг бит.
Таблица четности PARITY_TAB позволяет быстро получить значение четности бита.
Таким образом происходит кодирование данных для проверки гипотез алгоритма при декодировании.
... if node_id >= node_id_tail: tms[node_id, 0] = metrics_all[node_id, lsym] tms[node_id, 1] = -1000000 else: m0 = metrics_all[node_id, lsym] m1 = metrics_all[node_id, 3 ^ lsym] if m0 >= m1: tms[node_id, 0] = m0 tms[node_id, 1] = m1 else: tms[node_id, 0] = m1 tms[node_id, 1] = m0 encstates[node_id] |= 1 current_i[node_id] = 0 continue ...
Хвостовые биты node_id_tail содержат нули для сброса состояния регистра, для них происходит ограничение выбора путем принудительной установки метрики на очень маленькое значение -1000000.
... else: while True: if node_id == 0 or gammas[node_id - 1] < threshold: threshold -= delta if current_i[node_id] != 0: current_i[node_id] = 0 encstates[node_id] ^= 1 break node_id -= 1 if node_id < node_id_tail and current_i[node_id] != 1: current_i[node_id] = 1 encstates[node_id] ^= 1 break
Если при движении вперед значение метрики current_gamma оказалось меньше порогового значения threshold, алгоритм осуществляет обратное движение по дереву, при этом происходит переход на другую ветку в текущем узле, вероятность которой была ниже предыдущей — current_i[node_id] = 1.
Если на всех узлах ветки значение метрики не достигло порогового, то происходит перемещение на один узел назад — node_id -= 1.
Если пороговое значение всё ещё слишком большое, или алгоритм вернулся в самое начало дерева, то значение порога снижается на delta и цикл повторяется с самого начала.
Таблица четности бит:
PARITY_TAB = np.array([ 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 1, 0, ])
Таблицы метрик, рассчитанных с помощью моделирования для 2-FSK тонов, при соотношениях Es/No=0,3,6,9 дБ и среднеквадратичном уровне шума в 50:
WSPR_METRIC_TABLES = np.array([ [0.9782, 0.9695, 0.9689, 0.9669, 0.9666, 0.9653, 0.9638, 0.9618, 0.9599, 0.9601, 0.9592, 0.9570, 0.9556, 0.9540, 0.9525, 0.9527, 0.9486, 0.9477, 0.9450, 0.9436, 0.9424, 0.9400, 0.9381, 0.9360, 0.9340, 0.9316, 0.9301, 0.9272, 0.9254, 0.9224, 0.9196, 0.9171, 0.9154, 0.9123, 0.9076, 0.9061, 0.9030, 0.9000, 0.8965, 0.8934, 0.8903, 0.8874, 0.8834, 0.8792, 0.8760, 0.8726, 0.8685, 0.8639, 0.8599, 0.8550, 0.8504, 0.8459, 0.8422, 0.8364, 0.8320, 0.8262, 0.8215, 0.8159, 0.8111, 0.8052, 0.7996, 0.7932, 0.7878, 0.7812, 0.7745, 0.7685, 0.7616, 0.7550, 0.7479, 0.7405, 0.7336, 0.7255, 0.7184, 0.7102, 0.7016, 0.6946, 0.6860, 0.6769, 0.6687, 0.6598, 0.6503, 0.6416, 0.6325, 0.6219, 0.6122, 0.6016, 0.5920, 0.5818, 0.5711, 0.5606, 0.5487, 0.5374, 0.5266, 0.5142, 0.5020, 0.4908, 0.4784, 0.4663, 0.4532, 0.4405, 0.4271, 0.4144, 0.4006, 0.3865, 0.3731, 0.3594, 0.3455, 0.3304, 0.3158, 0.3009, 0.2858, 0.2708, 0.2560, 0.2399, 0.2233, 0.2074, 0.1919, 0.1756, 0.1590, 0.1427, 0.1251, 0.1074, 0.0905, 0.0722, 0.0550, 0.0381, 0.0183, 0.0000, -0.0185, -0.0391, -0.0571, -0.0760, -0.0966, -0.1160, -0.1370, -0.1584, -0.1787, -0.1999, -0.2214, -0.2423, ... -14.1653, -14.4348, -14.7983, -14.7807, -15.2349, -15.3536, -15.3026, -15.2739, -15.7170, -16.2161, -15.9185, -15.9490, -16.6258, -16.5568, -16.4318, -16.7999, -16.4101, -17.6393, -17.7643, -17.2644, -17.5973, -17.0403, -17.7039, -18.0073, -18.1840, -18.3848, -18.6286, -20.7063, 1.43370769e-019, 2.64031087e-006, 6.6908396e+031, 1.77537994e+028, 2.79322819e+020, 1.94326e-019, 0.00019371575, 2.80722121e-041] ]) WSPR_METRIC_TABLE = np.zeros((2, 256), dtype=np.int64) WSPR_METRIC_TABLE[0, :] = 10 * (WSPR_METRIC_TABLES[2] - WSPR_DECODER_BIAS) WSPR_METRIC_TABLE[1, :] = WSPR_METRIC_TABLE[0, ::-1]
Полные таблицы можно посмотреть по этой ссылке.
Функция decode передает в декодер Фано значения символов, получая при этом на выходе байты данных принятого сообщения в decoded.
WSPR_NUM_BITS = 81 WSPR_FANO_THRESHOLD = 60 WSPR_DECODER_LIM = 10000 class WSPRMonitor(AbstractMonitor): ... def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: ... decoded = fano( symbols, WSPR_NUM_BITS, WSPR_METRIC_TABLE, WSPR_FANO_THRESHOLD, WSPR_DECODER_LIM ) idt += 1 if self.quick_mode: break ib += 1 if decoded is not None: metric, cycles, dec_data = decoded decodes_pass += 1 payload = dec_data.tobytes() ...
Переменная payload содержит в себе байты WSPR сообщения.
Удаление обработанного сигнала
При многопроходном декодировании, после успешного детектирования и декодирования сообщения из сигнала, в WSPR происходит фильтрация спектра, последовательность принятых тонов подавляется в частотной области, — эта операция позволяет выделять слабые сигналы в спектре, которые изначально перекрывались сильными.
Фильтр реализуется функцией subtract_signal:
class WSPRMonitor(AbstractMonitor): ... def subtract_signal(self, signal: npt.NDArray[np.complex128], f0, shift0: int, drift0, channel_symbols: npt.NDArray[np.uint8]): filtr = 360 samples = WSPR_ND * self.symbol_len cs_rep = np.repeat(channel_symbols - 1.5, self.symbol_len) ...
Массив cs_rep содержит массив тонов сдвинутых относительно центральной частоты, для соответствия тонам 4-FSK.
... idx = np.repeat(np.arange(WSPR_ND), self.symbol_len) drift_eff = (drift0 / 2.0) * (idx - WSPR_ND / 2.0) / (WSPR_ND / 2.0) freq = f0 + drift_eff + cs_rep * self.df phase = 2.0 * np.pi * np.cumsum(freq) * self.dt ref = np.exp(1j * phase) ...
В phase производится реконструкция фазы на основе частот тонов freq (относительно центральной частоты f0), массив ref содержит эталонный комплексный сигнал.
... start = max(shift0, 0) end = min(start + samples, len(signal)) segment = signal[start:end] env = segment * np.conj(ref) ...
Детектирование огибающей сигнала (массив env).
... weights = np.sin(np.pi * np.arange(filtr) / (filtr - 1)) weights /= weights.sum() ...
Формирование сглаживающей функции на основе синуса, для подавления высокочастотных флуктуаций, возникающих при детектировании огибающей.
... env_sm = np.convolve(env, weights, mode="same") partial_sum = np.cumsum(weights) filtr_half = filtr // 2 corr_f = np.ones(samples) corr_f[:filtr_half] = partial_sum[filtr_half: filtr] corr_f[-filtr_half:] = np.flip(partial_sum[filtr_half: filtr]) ...
Свертка сигнала со сглаживающей функцией реализует фильтр нижних частот (env_sm). Для коррекции краевых эффектов после применения окна, рассчитывается корректирующий фактор corr_f на основе кумулятивной суммы весов фильтра (partial_sum). Коррекция краевых эффектов заключается в компенсации потери энергии сигнала на краях фильтра.
... reconstruction = (env_sm / corr_f) * ref signal[start:end] -= reconstruction[start:end]
В массиве reconstruction содержится эталонный сигнал, совпадающий по фазе и амплитуде с сигналами в исходном спектре. Исключение реконструированного сигнала из общего осуществляется операцией разности между signal и reconstruction.
Особенность фильтра subtract_signal заключается в применении адаптивной огибающей, осуществляющей достаточно точную реконструкцию исходного сигнала, что позволяет добиться максимально точной фильтрации сигнала.
Подсчет количества ошибок в принятом сигнале
Для оценки качества принятого сигнала в WSPR осуществляется подсчет ошибок между сырыми принятыми символами и восстановленными кодами коррекции ошибок символами.
class WSPRMonitor(AbstractMonitor): ... @staticmethod def count_sym_err(symbols: npt.NDArray[np.uint8], channel_symbols: npt.NDArray[np.uint8]) -> int: cw = (channel_symbols >= 2).astype(np.uint8) WSPRMonitor.deinterleave(cw) sym_bin = (symbols > 127).astype(np.uint8) err_count = int(np.sum(sym_bin != cw)) return err_count
Функция count_sym_err принимает на вход эталонные символы symbols и сырые данные channel_symbols. Принятые из эфира символы подвергаются деинтерливингу и производится подсчет количества символов, значения которых не совпадают.
Декодирование сообщений
Финальный этап обработки сигнала в WSPR. После извлечения данных сообщения из dec_data, производится декодирование самого сообщения, вычисление метрик о качестве сигнала, фильтрация принятого сигнала и вывод результата.
class WSPRMonitor(AbstractMonitor): ... def decode(self, **kwargs) -> typing.Iterator[WSPRLogItem]: ... payload = dec_data.tobytes() msg = WSPRMessage.decode(payload) ...
В msg содержится объект с текстовым сообщением WSPR. Декодер сообщений был рассмотрел в первой части статьи.
... dt_print = shift1 * self.dt - 1.0 freq_print = (WSPR_CENTER_FREQ + f1) / 1e6 payload = msg.encode() tones = wspr_encode(payload) chan_sym = np.fromiter(tones, dtype=np.uint8) self.subtract_signal(iq_signal, f1, shift1, drift1, chan_sym) sym_err = self.count_sym_err(symbols, chan_sym) yield WSPRLogItem( snr=cand.snr, dT=dt_print, dF=freq_print, payload=payload, crc=0, BER=sym_err, drift=drift1, decode_pass=decodes_pass )
Декодированное сообщение подвергается повторному внутреннему кодированию данных в WSPR сигнал, для фильтрации сигнала и подсчета количества ошибок в принятых символах.
WSPRLogItem содержит данные сообщения и оценки качества сигнала:
@dataclass class LogItem: snr: float dT: float dF: float payload: typing.ByteString crc: int @dataclass class WSPRLogItem(LogItem): BER: int drift: float decode_pass: int
Полный пример декодера с использованием класса WSPRMonitor:
import time from scipy.io.wavfile import read from decoders.wspr import WSPRMonitor from msg.message import WSPRMsgServer def main(): sample_rate, data = read("examples/signal.wav") msg_svr = WSPRMsgServer() mon = WSPRMonitor( sample_rate=sample_rate ) mon.monitor_process(data) for it in mon.decode(): msg = msg_svr.decode(it.payload) print( f"dB: {it.snr:.3f}\t" f"T: {it.dT:.3f}\t" f"DF: {it.dF:.3f}\t" f"BER: {it.BER}\t" f"drift: {it.drift}\t" f"pass: {it.decode_pass}\t" f"Message text: {msg}" ) if __name__ == '__main__': main()
Пример декодированного сообщения WSPR:
dB: 43.265T: -1.000DF: 0.002BER: 0drift: 0pass: 1Message text: R9FEU LO87 50
Заключение
В завершающей цикл статье, посвященной протоколу WSPR были рассмотрены механизмы и принципы цифровой обработки сигналов, перенос спектра, детектирование и выделение слабых сигналов согласованными фильтрами при очень низких значениях SNR, механизмы коррекции ошибок на основе сверточных кодов.
Протокол WSPR может представлять интерес с точки зрения реализации протоколов связи, способных обеспечивать передачу данных при малых мощностях передатчиков, при очень низких значениях SNR, при этом обладая устойчивостью к помехам. Отдельного внимания заслуживают алгоритмы цифровой обработки сигналов и математические принципы, лежащие в их основе.
От автора статьи:
Реализация WSPR декодера на Python позволила во многом ощутить возможности библиотеки numpy, пересмотреть восприятие сигналов, так как вся обработка сигналов осуществляется в I/Q-форме, с использованием комплексных экспонент, что потребовало преодоления некоего порога вхождения.
К сожалению, реализация на Python проигрывает по скорости декодирования с реализацией на Си, хоть и были использованы возможности библиотеки numpy и jit-компилятора, позволяющие получить значительное ускорение вычислений.
По традиции, начавшейся с Q65, автор выражает благодарность всем читателям данного цикла статей и лицам, заинтересованных в устройстве протокола WSPR.