Представьте, что у вас есть многослойный пайплайн обработки данных.

  • Слой 0 — сырые события: цена тика, действие пользователя, показание датчика.

  • Слой 1 — агрегаты по инструментам: дельта, гамма, скор.

  • Слой 2 — агрегаты по секторам: риск на сектор, общая экспозиция.

  • Слой 3 — портфельные метрики: VaR, ожидаемая прибыль.

  • Слой 4 — enterprise-лимиты и алерты.

Ширина слоя — 5000 узлов. Количество слоёв — 60. Общее число узлов — 300 000.

Каждую секунду приходит 10 новых событий (изменений на входе). Наивный подход — пересчитать всё с нуля — будет перебирать все 300 000 узлов на каждое обновление. При 10 обновлениях в секунду это 3 млн вычислений узлов в секунду. А если ширина слоя 100 000 и слоёв 100? Получаем 10 млн узлов на пересчёт. Компьютер не справляется.

Классические подходы и их ограничения

Подход

Проблема

Полный пересчёт (recompute everything)

Экспоненциальный рост времени при увеличении графа

Триггеры в БД

Не работают для многослойных in-memory графов

Stream-процессоры (Flink, Kafka Streams)

Тяжёлые, не для in-memory иерархий

Кастомный кэш инвалидации

Сложно реализовать корректно, легко ошибиться

Требования к решению

  • Инкрементальный пересчёт: только затронутые узлы, а не все.

  • Минимальные аллокации в горячем пути.

  • Точные индексы для быстрых запросов (Fenwick, гистограммы, суммы).

  • Поддержка двух режимов обновления: SetValue (production) и Mutate (симуляции).

Решение: PhiFlow

PhiFlow — библиотека для .NET 8.0+, реализующая инкрементальные вычисления на слоистых графах фиксированной ширины.

dotnet add package PhiFlow --version 0.1.3

Ключевые идеи

  1. Interval Cone-of-Influence — для каждого обновления вычисляется минимальное множество затронутых узлов в виде непрерывных интервалов на каждом слое.

  2. Фиксированная ширина слоёв — упрощает индексацию и ускоряет доступ.

  3. Точные индексы (Fenwick, гистограммы, суммы) — без приближений и с вероятностными структурами вроде HyperLogLog.

  4. Два режима обновления — SetValue (замена значения) для бизнес-логики и Mutate (дельта) для симуляций.

Быстрый старт

Шаг 1. Создание графа

Определяем параметры:

  • width — количество узлов в каждом слое.

  • layers — количество слоёв.

  • domain — диапазон дискретных значений (0..DomainSize-1).

using PhiFlow;
int width = 5000;
int layers = 60;
int domain = 1024;
var rt = new PhiFlowRuntime(width, layers, domain);
// Рекомендуется: зарезервировать место под дельты
rt.Reserve(maxDeltaCount: 16);

Шаг 2. Подключение индексов

Индексы ускоряют запросы (CountGreater, RangeCount, Sum, TopKSum). Подключаются к нужному слою.

int lastLayer = layers - 1;
// Fenwick-индекс для быстрых CountGreater и RangeCount
rt.AttachIndex(lastLayer, new FenwickCountIndex(domain));
// Индекс для суммы значений
rt.AttachIndex(lastLayer, new SumIndex(width));
// Индекс для Top-K через гистограммы
rt.AttachIndex(lastLayer, new HistogramTopKIndex(domain, width));

Шаг 3. Инициализация входного слоя

Заполняем слой 0 начальными значениями.

var rnd = new Random(1);
int[] input = new int[width];
for (int i = 0; i < width; i++)
{
    input[i] = rnd.Next(domain);
}
rt.SetInput(input);
rt.BuildAll(kWork: 50); // полная сборка графа

Шаг 4. Применение обновлений

Вместо пересчёта всего графа передаём только изменившиеся входы.

var updates = new InputUpdate[]
{
    new InputUpdate(index: 10, value: 512),
    new InputUpdate(index: 123, value: 7),
    new InputUpdate(index: 2048, value: 999)
};
rt.ApplyInputUpdates(updates, kWork: 50);

Библиотека сама определяет, какие узлы затронуты (Interval Cone-of-Influence), и пересчитывает только их.

Шаг 5. Выполнение запросов

Благодаря индексам запросы выполняются мгновенно.

// Количество элементов на последнем слое > 500
long countGt = rt.CountGreater(lastLayer, threshold: 500);
// Количество элементов в диапазоне [100, 200)
long rangeCount = rt.RangeCount(lastLayer, loInclusive: 100, hiExclusive: 200);
// Сумма всех значений на слое
long sum = rt.Sum(lastLayer);
// Среднее значение
long avg = rt.Avg(lastLayer);
// Сумма топ-50 значений
long topKSum = rt.TopKSum(lastLayer, k: 50);

Полный рабочий пример

using PhiFlow;
public class RealTimeAnalyticsPipeline
{
    private readonly PhiFlowRuntime _runtime;
    private readonly int _lastLayer;
    
    public RealTimeAnalyticsPipeline(int width, int layers, int domain)
    {
        _runtime = new PhiFlowRuntime(width, layers, domain);
        _runtime.Reserve(maxDeltaCount: 32);
        _lastLayer = layers - 1;
        
        // Подключаем индексы для аналитики
        _runtime.AttachIndex(_lastLayer, new FenwickCountIndex(domain));
        _runtime.AttachIndex(_lastLayer, new SumIndex(width));
        _runtime.AttachIndex(_lastLayer, new HistogramTopKIndex(domain, width));
    }
    
    public void Initialize(int[] initialData)
    {
        _runtime.SetInput(initialData);
        _runtime.BuildAll(kWork: 50);
    }
    
    public void ProcessEvents(IEnumerable<InputUpdate> events)
    {
        var updates = events.ToArray();
        _runtime.ApplyInputUpdates(updates, kWork: 50);
    }
    
    public AnalyticsSnapshot GetSnapshot()
    {
        return new AnalyticsSnapshot
        {
            TotalCount = _runtime.Sum(_lastLayer),
            HighThresholdCount = _runtime.CountGreater(_lastLayer, 800),
            MidRangeCount = _runtime.RangeCount(_lastLayer, 200, 600),
            Top10Sum = _runtime.TopKSum(_lastLayer, 10)
        };
    }
}
public class AnalyticsSnapshot
{
    public long TotalCount { get; set; }
    public long HighThresholdCount { get; set; }
    public long MidRangeCount { get; set; }
    public long Top10Sum { get; set; }
}

Сценарии использования

1. FinTech: управление рисками

4 слоя: инструменты → сектора → портфель → enterprise-лимиты.

int width = 10000;  // 10k инструментов
int layers = 4;
int domain = 100000; // дискретные уровни экспозиции
var riskRuntime = new PhiFlowRuntime(width, layers, domain);
riskRuntime.AttachIndex(3, new FenwickCountIndex(domain)); // лимиты
// Пришло обновление цены на инструмент 42
var tickUpdate = new InputUpdate(42, newExposureValue);
riskRuntime.ApplyInputUpdates(new[] { tickUpdate }, kWork: 50);
// Мгновенный запрос: сколько секторов превысили лимит?
long breachedCount = riskRuntime.CountGreater(2, threshold: 10000);

2. GameDev: симуляция распространения влияния (эпидемия / социальные настроения)

6 слоёв: заражение в городе → риск для соседних городов → плотность заболевших → нагрузка на больницы → дефицит ресурсов → уровень паники.

int cities = 5000;
int layers = 6;
int domain = 100; // 0-100: процент заражённых / уровень паники
var epidemicRuntime = new PhiFlowRuntime(cities, layers, domain);
epidemicRuntime.AttachIndex(5, new SumIndex(cities));              // общий уровень паники
epidemicRuntime.AttachIndex(5, new HistogramTopKIndex(domain, cities)); // самые паникующие города
// Вирус мутировал в городе 123, заражённость выросла до 75%
epidemicRuntime.ApplyInputUpdates(new[] { new InputUpdate(123, 75) }, kWork: 50);
// Как изменился общий уровень паники по стране?
long totalPanic = epidemicRuntime.Sum(5); // сумма процентов паники по всем городам

3. IIoT: предиктивная аналитика

Датчики → станки → линии → заводы → регион.

int sensors = 20000;
int layers = 5;
int domain = 4096; // показания датчиков 0..4095
var iotRuntime = new PhiFlowRuntime(sensors, layers, domain);
iotRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // алерты по регионам
// Датчик 5001 показал аномалию
var anomalyUpdate = new InputUpdate(5001, 3800);
iotRuntime.ApplyInputUpdates(new[] { anomalyUpdate }, kWork: 50);
// Сколько заводов в аномальной зоне?
long anomalousPlants = iotRuntime.CountGreater(3, threshold: 3500);

4. AdTech: real-time bidding

Импрессия → пользователь → сегмент → кампания → бюджет.

int users = 100000;
int layers = 5;
int domain = 100; // скор пользователя 0..99
var adRuntime = new PhiFlowRuntime(users, layers, domain);
adRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // бюджетные лимиты
// Пользователь 42 совершил конверсию
var conversionUpdate = new InputUpdate(42, 95);
adRuntime.ApplyInputUpdates(new[] { conversionUpdate }, kWork: 50);
// Сколько сегментов превысили бюджетный порог?
long budgetBreached = adRuntime.CountGreater(3, threshold: 80);

Производительность

Бенчмарки на Intel Core i5-11400F, Windows 11, .NET 8.0, BenchmarkDotNet 0.15.8.

Параметры графа: ширина 5000, слоёв 60, домен 1024.

Сценарий

Без PhiFlow (полный пересчёт)

С PhiFlow

Ускорение

1 дельта, KWork=50

10.7 с

0.195 с

~55x

4 дельты, KWork=50

10.7 с

0.75 с

~14x

1 дельта, KWork=10

1.8 с

0.034 с

~53x

Что означают эти цифры:

  • При одном изменении на входе библиотека пересчитывает не все 300 000 узлов, а только интервал затронутых.

  • Чем меньше дельт относительно общего объёма графа, тем больше выигрыш.

  • Индексы добавляют ускорение для запросов: CountGreater, RangeCount, TopKSum выполняются за O(log domain) или O(1).

Сравнение с альтернативами

Характеристика

PhiFlow

Полный пересчёт

Stream processor (Flink)

ClickHouse

Инкрементальный пересчёт

✅ (interval cone)

Точные индексы

✅ (но не для per-event)

In-memory

❌ (Java/JVM)

❌ (диск)

Многослойные графы

✅ (родной)

Латентность на запрос

микросекунды

зависит

миллисекунды+

миллисекунды

Сложность внедрения

низкая

высокая

очень высокая

средняя

Пошаговая интеграция в проект

Шаг 1. Моделирование пайплайна

Определите, сколько у вас слоёв и какова ширина каждого. PhiFlow требует фиксированной ширины для всех слоёв — это упрощает индексацию.

Шаг 2. Выбор домена

Домен — это диапазон дискретных значений (0..DomainSize-1). Чем меньше домен, тем компактнее индексы Fenwick и гистограммы.

Шаг 3. Инициализация runtime

var runtime = new PhiFlowRuntime(width, layers, domain);
runtime.Reserve(maxDeltaCount: expectedUpdatesPerBatch);

Шаг 4. Подключение индексов к слоям, которые часто запрашиваются

if (needThresholdQueries)
    runtime.AttachIndex(layer, new FenwickCountIndex(domain));
if (needSumQueries)
    runtime.AttachIndex(layer, new SumIndex(width));
if (needTopKQueries)
    runtime.AttachIndex(layer, new HistogramTopKIndex(domain, width));

Шаг 5. Загрузка начальных данных

runtime.SetInput(initialData);
runtime.BuildAll(kWork: 50); // 50 — эвристика, подбирается под вашу топологию

Шаг 6. Приём обновлений

void OnInputChanged(int index, int newValue)
{
    var update = new InputUpdate(index, newValue);
    runtime.ApplyInputUpdates(new[] { update }, kWork: 50);
}

Шаг 7. Маршрутизация запросов через индексы

public long GetHighRiskCount(int threshold) =>
    runtime.CountGreater(riskLayer, threshold);

Два режима обновлений

SetValue (рекомендуется для production)

Заменяет значение узла на новое.

var update = new InputUpdate(index: 42, value: 512);
runtime.ApplyInputUpdates(new[] { update }, kWork: 50);

Mutation (для симуляций и тестирования)

Детерминированная мутация значения. Полезно, когда нужно воспроизвести последовательность изменений.

var mutation = new InputMutation(index: 42, delta: +5);
runtime.ApplyInputMutations(new[] { mutation }, kWork: 50);

Бесплатное тестирование — в рамках Community Edition. Коммерческое использование требует лицензии.

Где взять

NuGet: dotnet add package PhiFlow

GitHub (бенчмарки): https://github.com/likeslines-maker/PhiFlow

PhiFlow — это библиотека для инкрементальных вычислений на слоистых графах фиксированной ширины.

Она решает конкретную задачу: когда в многослойный пайплайн приходит небольшое количество обновлений, а вам нужно мгновенно получать точные агрегаты (CountGreater, RangeCount, Sum, TopK) на любом слое.

Библиотека не пытается заменить полноценные stream-процессоры или OLAP-базы. Она занимает свою нишу: in-memory, микросекундные латентности, точные индексы, минимальные аллокации.

Если ваш пайплайн из 60 слоёв пересчитывается за 10 секунд вместо 0.2 — возможно, вы просто считали не тем способом.

Комментарии (8)


  1. impwx
    29.05.2026 07:46

    Не нашел ни в одном примере, как задаются связи между элементами слоя и между слоями


    1. arhip1986 Автор
      29.05.2026 07:46

      Связи жёстко зашиты в DefaultPhi.FromPrevLayer: каждый узел зависит от [i, i-1, i+1, i+2] предыдущего слоя (кольцевое замыкание). Это не конфигурируется через API, но можно реализовать свой IPhi с любой логикой. Однако если вы меняете радиус зависимости, интервальный пропагатор может дать неоптимальный affected set (он зашит на расширение 2 влево, 1 вправо). Для произвольных графов PhiFlow, скорее всего, не подойдёт — это библиотека для слоистых stencil-пайплайнов.


      1. impwx
        29.05.2026 07:46

        А объясните тогда, пожалуйста, пример 2 из области GameDev, где провинции и налоги, как это вообще работает? Доход провинции 1 зависит от налога провинций 1, 2, 3, 4? Как именно зависит, с какими коэффициентами?


        1. arhip1986 Автор
          29.05.2026 07:46

          В текущей реализации DefaultPhi зависимости детерминированные и не имеют настраиваемых коэффициентов. В примере с провинциями коэффициенты были бы единичными (все влияния равны).

          Как это работает в коде

          Напомню, что DefaultPhi.FromPrevLayer вычисляет значение узла так:

          int p0 = prev[i];                      // налог текущей провинции
          int p1 = prev[Mod(i - 1, width)];      // налог левой провинции
          int p2 = prev[Mod(i + 1, width)];      // налог правой провинции
          int p3 = prev[Mod(i + 2, width)];      // налог следующей провинции
          uint x = (uint)p0;
          x ^= BitOperations.RotateLeft((uint)p1, 13);
          x ^= BitOperations.RotateLeft((uint)p2, 29);
          x ^= (uint)p3 * 0x9E3779B9u;
          for (int k = 0; k < kWork; k++)
              x = Mix32(x + (uint)p0 + ((uint)p1 ^ (uint)p2) + (uint)p3 * (uint)(k + 1));
          return ReduceToDomain(x, domainSize);

          Что это значит для примера «провинции и налоги»

          Пусть у нас 5 провинций, расположенных кольцом (провинция 0 соседствует с 4 и 1).

          Провинции: [0] [1] [2] [3] [4]

          Входной слой (layer 0) — налог в каждой провинции. Значения от 0 до domainSize-1 (например, 0..1000).

          Выходной слой (layer 1) — доход провинции. Доход провинции i вычисляется из налогов провинций i, i-1, i+1, i+2.

          То есть:

          Доход(провинция 2) = F( налог(2), налог(1), налог(3), налог(4) )

          Функция F — не взвешенная сумма, а хеширующая функция со следующими свойствами:

          1. Детерминированная — одинаковый вход всегда даёт одинаковый выход.

          2. Чувствительная к изменениям — меняется любой налог → меняется доход.

          3. Принимает значения только в диапазоне [0..domainSize-1].

          4. Имеет параметр kWork — количество дополнительных раундов перемешивания (чем больше, тем «сложнее» зависимость).

          Почему не обычная формула вроде налог 0.7 + налог_соседа 0.3?

          Потому что PhiFlow — не калькулятор линейных уравнений. Она решает другую задачу:

          • Нужно детерминированное отображение с предсказуемым распределением выходов.

          • Коэффициенты не нужны, потому что значения всё равно дискретны и домен ограничен.

          • Хеширующая функция даёт хорошее рассеивание — даже маленькое изменение на входе может дать любое значение на выходе. Это полезно для симуляций и тестирования, когда не нужна точная экономическая модель, а нужна быстрая цепочка вычислений.

          Как сделать свой пример с коэффициентами?

          Реализовать кастомный IPhi:

          public class EconomicPhi : IPhi
          {
              private readonly float _selfWeight = 0.7f;
              private readonly float _neighborWeight = 0.3f;
              public int FromPrevLayer(ReadOnlySpan<int> prev, int i, int width, int domainSize, int kWork)
              {
                  int self = prev[i];
                  int left = prev[Mod(i - 1, width)];
                  int right = prev[Mod(i + 1, width)];
                  // Взвешенная сумма
                  float weighted = _selfWeight * self + _neighborWeight * (left + right) / 2f;
                  
                  // Приведение к домену
                  int result = (int)Math.Round(weighted);
                  return Math.Clamp(result, 0, domainSize - 1);
              }
              public int MutateInputInDomain(int oldValue, int domainSize)
              {
                  // Простая мутация: случайный сдвиг
                  var newValue = oldValue + Random.Shared.Next(-10, 11);
                  return Math.Clamp(newValue, 0, domainSize - 1);
              }
              private static int Mod(int a, int m)
              {
                  int r = a % m;
                  return r < 0 ? r + m : r;
              }
          }

          Использование:

          var phi = new EconomicPhi();
          var runtime = new PhiFlowRuntime(width: 5000, layers: 6, domainSize: 1000, phi);
          runtime.SetInput(taxValues);  // налоги
          runtime.BuildAll(kWork: 0);   // kWork не используется в кастомной логике
          // Доходы в layer 1 теперь считаются по вашей формуле
          var incomes = runtime.GetLayerSpan(1);

          Что происходит с интервальным пропагатором?

          Если вы изменили радиус зависимости (например, убрали зависимость от i+2), интервальный пропагатор IntervalPropagator всё равно будет предполагать расширение 2 влево и 1 вправо, потому что он зашит на топологию DefaultPhi. Это значит:

          • ApplyInputUpdates может пересчитывать больше узлов, чем реально нужно (субоптимально, но корректно).

          • Если вы хотите точного соответствия, нужно либо использовать полный пересчёт (RecomputeAll), либо переопределять пропагатор.

          Пример с налогами в статье иллюстрирует класс задач, где PhiFlow применима:

          • Есть много узлов (провинций).

          • Есть много слоёв (доход → счастье → производительность → сила армии).

          • Приходит одно изменение (налог подняли).

          • Нужно быстро пересчитать всё и ответить на запросы (тотальная сила армии, топ-3 самых сильных провинций).

          Конкретные коэффициенты и формула — на разработчике игры. PhiFlow даёт инструмент для быстрого инкрементального пересчёта и готовые индексы для запросов. Саму экономическую модель можно написать в кастомном IPhi.


          1. impwx
            29.05.2026 07:46

            Позовите пожалуйста оператора - я хочу пообщаться с живым человеком, а не с ллмкой.

            Доход(провинция 2) = F( налог(2), налог(1), налог(3), налог(4) )

            В каком мире доход провинции может зависеть от налогов ее соседей??? С одинаковыми коэффициентами между всеми элементами в кольце и на всех слоях? Это абсурд, который нейросеть видимо сначала нагаллюцинировала, а теперь продолжает на голубом глазу расписывать длинными предложениями с примерами. Соответственно, остальные примеры также могут быть натягиванием совы на глобус.


            1. arhip1986 Автор
              29.05.2026 07:46

              да, увидел в коде stencil [i, i-1, i+1, i+2] и притянул к нему первую попавшуюся игровую метафору, пример был неудачным

              P.S. Спасибо что обратили внимание, статью отредактировал, добавил корректный пример - 6 слоёв: заражение в городе → риск для соседних городов → плотность заболевших → нагрузка на больницы → дефицит ресурсов → уровень паники.


  1. EasyGame
    29.05.2026 07:46

    А в чем преимущество перед родным дотнетовским TPL Dataflow?


    1. arhip1986 Автор
      29.05.2026 07:46

      TPL Dataflow — для асинхронных конвейеров и произвольных графов. PhiFlow — для слоистых stencil-пайплайнов, где нужно после небольшого изменения получить мгновенные агрегаты (CountGreater, TopK, Sum) по всем слоям. Ускорение достигается за счёт интервального конуса влияния (не пересчитываем весь слой, только affected interval) и точных индексов (Fenwick, гистограммы). TPL Dataflow не умеет ни того, ни другого из коробки. Если вам нужно и то, и другое — поставьте PhiFlow внутрь блока Dataflow.