Представьте, что у вас есть многослойный пайплайн обработки данных.
Слой 0 — сырые события: цена тика, действие пользователя, показание датчика.
Слой 1 — агрегаты по инструментам: дельта, гамма, скор.
Слой 2 — агрегаты по секторам: риск на сектор, общая экспозиция.
Слой 3 — портфельные метрики: VaR, ожидаемая прибыль.
Слой 4 — enterprise-лимиты и алерты.
Ширина слоя — 5000 узлов. Количество слоёв — 60. Общее число узлов — 300 000.
Каждую секунду приходит 10 новых событий (изменений на входе). Наивный подход — пересчитать всё с нуля — будет перебирать все 300 000 узлов на каждое обновление. При 10 обновлениях в секунду это 3 млн вычислений узлов в секунду. А если ширина слоя 100 000 и слоёв 100? Получаем 10 млн узлов на пересчёт. Компьютер не справляется.
Классические подходы и их ограничения
Подход |
Проблема |
|---|---|
Полный пересчёт (recompute everything) |
Экспоненциальный рост времени при увеличении графа |
Триггеры в БД |
Не работают для многослойных in-memory графов |
Stream-процессоры (Flink, Kafka Streams) |
Тяжёлые, не для in-memory иерархий |
Кастомный кэш инвалидации |
Сложно реализовать корректно, легко ошибиться |
Требования к решению
Инкрементальный пересчёт: только затронутые узлы, а не все.
Минимальные аллокации в горячем пути.
Точные индексы для быстрых запросов (Fenwick, гистограммы, суммы).
Поддержка двух режимов обновления: SetValue (production) и Mutate (симуляции).
Решение: PhiFlow
PhiFlow — библиотека для .NET 8.0+, реализующая инкрементальные вычисления на слоистых графах фиксированной ширины.
dotnet add package PhiFlow --version 0.1.3
Ключевые идеи
Interval Cone-of-Influence — для каждого обновления вычисляется минимальное множество затронутых узлов в виде непрерывных интервалов на каждом слое.
Фиксированная ширина слоёв — упрощает индексацию и ускоряет доступ.
Точные индексы (Fenwick, гистограммы, суммы) — без приближений и с вероятностными структурами вроде HyperLogLog.
Два режима обновления — SetValue (замена значения) для бизнес-логики и Mutate (дельта) для симуляций.
Быстрый старт
Шаг 1. Создание графа
Определяем параметры:
width— количество узлов в каждом слое.layers— количество слоёв.domain— диапазон дискретных значений (0..DomainSize-1).
using PhiFlow; int width = 5000; int layers = 60; int domain = 1024; var rt = new PhiFlowRuntime(width, layers, domain); // Рекомендуется: зарезервировать место под дельты rt.Reserve(maxDeltaCount: 16);
Шаг 2. Подключение индексов
Индексы ускоряют запросы (CountGreater, RangeCount, Sum, TopKSum). Подключаются к нужному слою.
int lastLayer = layers - 1; // Fenwick-индекс для быстрых CountGreater и RangeCount rt.AttachIndex(lastLayer, new FenwickCountIndex(domain)); // Индекс для суммы значений rt.AttachIndex(lastLayer, new SumIndex(width)); // Индекс для Top-K через гистограммы rt.AttachIndex(lastLayer, new HistogramTopKIndex(domain, width));
Шаг 3. Инициализация входного слоя
Заполняем слой 0 начальными значениями.
var rnd = new Random(1); int[] input = new int[width]; for (int i = 0; i < width; i++) { input[i] = rnd.Next(domain); } rt.SetInput(input); rt.BuildAll(kWork: 50); // полная сборка графа
Шаг 4. Применение обновлений
Вместо пересчёта всего графа передаём только изменившиеся входы.
var updates = new InputUpdate[] { new InputUpdate(index: 10, value: 512), new InputUpdate(index: 123, value: 7), new InputUpdate(index: 2048, value: 999) }; rt.ApplyInputUpdates(updates, kWork: 50);
Библиотека сама определяет, какие узлы затронуты (Interval Cone-of-Influence), и пересчитывает только их.
Шаг 5. Выполнение запросов
Благодаря индексам запросы выполняются мгновенно.
// Количество элементов на последнем слое > 500 long countGt = rt.CountGreater(lastLayer, threshold: 500); // Количество элементов в диапазоне [100, 200) long rangeCount = rt.RangeCount(lastLayer, loInclusive: 100, hiExclusive: 200); // Сумма всех значений на слое long sum = rt.Sum(lastLayer); // Среднее значение long avg = rt.Avg(lastLayer); // Сумма топ-50 значений long topKSum = rt.TopKSum(lastLayer, k: 50);
Полный рабочий пример
using PhiFlow; public class RealTimeAnalyticsPipeline { private readonly PhiFlowRuntime _runtime; private readonly int _lastLayer; public RealTimeAnalyticsPipeline(int width, int layers, int domain) { _runtime = new PhiFlowRuntime(width, layers, domain); _runtime.Reserve(maxDeltaCount: 32); _lastLayer = layers - 1; // Подключаем индексы для аналитики _runtime.AttachIndex(_lastLayer, new FenwickCountIndex(domain)); _runtime.AttachIndex(_lastLayer, new SumIndex(width)); _runtime.AttachIndex(_lastLayer, new HistogramTopKIndex(domain, width)); } public void Initialize(int[] initialData) { _runtime.SetInput(initialData); _runtime.BuildAll(kWork: 50); } public void ProcessEvents(IEnumerable<InputUpdate> events) { var updates = events.ToArray(); _runtime.ApplyInputUpdates(updates, kWork: 50); } public AnalyticsSnapshot GetSnapshot() { return new AnalyticsSnapshot { TotalCount = _runtime.Sum(_lastLayer), HighThresholdCount = _runtime.CountGreater(_lastLayer, 800), MidRangeCount = _runtime.RangeCount(_lastLayer, 200, 600), Top10Sum = _runtime.TopKSum(_lastLayer, 10) }; } } public class AnalyticsSnapshot { public long TotalCount { get; set; } public long HighThresholdCount { get; set; } public long MidRangeCount { get; set; } public long Top10Sum { get; set; } }
Сценарии использования
1. FinTech: управление рисками
4 слоя: инструменты → сектора → портфель → enterprise-лимиты.
int width = 10000; // 10k инструментов int layers = 4; int domain = 100000; // дискретные уровни экспозиции var riskRuntime = new PhiFlowRuntime(width, layers, domain); riskRuntime.AttachIndex(3, new FenwickCountIndex(domain)); // лимиты // Пришло обновление цены на инструмент 42 var tickUpdate = new InputUpdate(42, newExposureValue); riskRuntime.ApplyInputUpdates(new[] { tickUpdate }, kWork: 50); // Мгновенный запрос: сколько секторов превысили лимит? long breachedCount = riskRuntime.CountGreater(2, threshold: 10000);
2. GameDev: симуляция распространения влияния (эпидемия / социальные настроения)
6 слоёв: заражение в городе → риск для соседних городов → плотность заболевших → нагрузка на больницы → дефицит ресурсов → уровень паники.
int cities = 5000; int layers = 6; int domain = 100; // 0-100: процент заражённых / уровень паники var epidemicRuntime = new PhiFlowRuntime(cities, layers, domain); epidemicRuntime.AttachIndex(5, new SumIndex(cities)); // общий уровень паники epidemicRuntime.AttachIndex(5, new HistogramTopKIndex(domain, cities)); // самые паникующие города // Вирус мутировал в городе 123, заражённость выросла до 75% epidemicRuntime.ApplyInputUpdates(new[] { new InputUpdate(123, 75) }, kWork: 50); // Как изменился общий уровень паники по стране? long totalPanic = epidemicRuntime.Sum(5); // сумма процентов паники по всем городам
3. IIoT: предиктивная аналитика
Датчики → станки → линии → заводы → регион.
int sensors = 20000; int layers = 5; int domain = 4096; // показания датчиков 0..4095 var iotRuntime = new PhiFlowRuntime(sensors, layers, domain); iotRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // алерты по регионам // Датчик 5001 показал аномалию var anomalyUpdate = new InputUpdate(5001, 3800); iotRuntime.ApplyInputUpdates(new[] { anomalyUpdate }, kWork: 50); // Сколько заводов в аномальной зоне? long anomalousPlants = iotRuntime.CountGreater(3, threshold: 3500);
4. AdTech: real-time bidding
Импрессия → пользователь → сегмент → кампания → бюджет.
int users = 100000; int layers = 5; int domain = 100; // скор пользователя 0..99 var adRuntime = new PhiFlowRuntime(users, layers, domain); adRuntime.AttachIndex(4, new FenwickCountIndex(domain)); // бюджетные лимиты // Пользователь 42 совершил конверсию var conversionUpdate = new InputUpdate(42, 95); adRuntime.ApplyInputUpdates(new[] { conversionUpdate }, kWork: 50); // Сколько сегментов превысили бюджетный порог? long budgetBreached = adRuntime.CountGreater(3, threshold: 80);
Производительность
Бенчмарки на Intel Core i5-11400F, Windows 11, .NET 8.0, BenchmarkDotNet 0.15.8.
Параметры графа: ширина 5000, слоёв 60, домен 1024.
Сценарий |
Без PhiFlow (полный пересчёт) |
С PhiFlow |
Ускорение |
|---|---|---|---|
1 дельта, KWork=50 |
10.7 с |
0.195 с |
~55x |
4 дельты, KWork=50 |
10.7 с |
0.75 с |
~14x |
1 дельта, KWork=10 |
1.8 с |
0.034 с |
~53x |
Что означают эти цифры:
При одном изменении на входе библиотека пересчитывает не все 300 000 узлов, а только интервал затронутых.
Чем меньше дельт относительно общего объёма графа, тем больше выигрыш.
Индексы добавляют ускорение для запросов: CountGreater, RangeCount, TopKSum выполняются за O(log domain) или O(1).
Сравнение с альтернативами
Характеристика |
PhiFlow |
Полный пересчёт |
Stream processor (Flink) |
ClickHouse |
|---|---|---|---|---|
Инкрементальный пересчёт |
✅ (interval cone) |
❌ |
✅ |
❌ |
Точные индексы |
✅ |
❌ |
❌ |
✅ (но не для per-event) |
In-memory |
✅ |
✅ |
❌ (Java/JVM) |
❌ (диск) |
Многослойные графы |
✅ (родной) |
❌ |
❌ |
❌ |
Латентность на запрос |
микросекунды |
зависит |
миллисекунды+ |
миллисекунды |
Сложность внедрения |
низкая |
высокая |
очень высокая |
средняя |
Пошаговая интеграция в проект
Шаг 1. Моделирование пайплайна
Определите, сколько у вас слоёв и какова ширина каждого. PhiFlow требует фиксированной ширины для всех слоёв — это упрощает индексацию.
Шаг 2. Выбор домена
Домен — это диапазон дискретных значений (0..DomainSize-1). Чем меньше домен, тем компактнее индексы Fenwick и гистограммы.
Шаг 3. Инициализация runtime
var runtime = new PhiFlowRuntime(width, layers, domain); runtime.Reserve(maxDeltaCount: expectedUpdatesPerBatch);
Шаг 4. Подключение индексов к слоям, которые часто запрашиваются
if (needThresholdQueries) runtime.AttachIndex(layer, new FenwickCountIndex(domain)); if (needSumQueries) runtime.AttachIndex(layer, new SumIndex(width)); if (needTopKQueries) runtime.AttachIndex(layer, new HistogramTopKIndex(domain, width));
Шаг 5. Загрузка начальных данных
runtime.SetInput(initialData); runtime.BuildAll(kWork: 50); // 50 — эвристика, подбирается под вашу топологию
Шаг 6. Приём обновлений
void OnInputChanged(int index, int newValue) { var update = new InputUpdate(index, newValue); runtime.ApplyInputUpdates(new[] { update }, kWork: 50); }
Шаг 7. Маршрутизация запросов через индексы
public long GetHighRiskCount(int threshold) => runtime.CountGreater(riskLayer, threshold);
Два режима обновлений
SetValue (рекомендуется для production)
Заменяет значение узла на новое.
var update = new InputUpdate(index: 42, value: 512); runtime.ApplyInputUpdates(new[] { update }, kWork: 50);
Mutation (для симуляций и тестирования)
Детерминированная мутация значения. Полезно, когда нужно воспроизвести последовательность изменений.
var mutation = new InputMutation(index: 42, delta: +5); runtime.ApplyInputMutations(new[] { mutation }, kWork: 50);
Бесплатное тестирование — в рамках Community Edition. Коммерческое использование требует лицензии.
Где взять
NuGet: dotnet add package PhiFlow
GitHub (бенчмарки): https://github.com/likeslines-maker/PhiFlow
PhiFlow — это библиотека для инкрементальных вычислений на слоистых графах фиксированной ширины.
Она решает конкретную задачу: когда в многослойный пайплайн приходит небольшое количество обновлений, а вам нужно мгновенно получать точные агрегаты (CountGreater, RangeCount, Sum, TopK) на любом слое.
Библиотека не пытается заменить полноценные stream-процессоры или OLAP-базы. Она занимает свою нишу: in-memory, микросекундные латентности, точные индексы, минимальные аллокации.
Если ваш пайплайн из 60 слоёв пересчитывается за 10 секунд вместо 0.2 — возможно, вы просто считали не тем способом.
Комментарии (8)

EasyGame
29.05.2026 07:46А в чем преимущество перед родным дотнетовским TPL Dataflow?

arhip1986 Автор
29.05.2026 07:46TPL Dataflow — для асинхронных конвейеров и произвольных графов. PhiFlow — для слоистых stencil-пайплайнов, где нужно после небольшого изменения получить мгновенные агрегаты (CountGreater, TopK, Sum) по всем слоям. Ускорение достигается за счёт интервального конуса влияния (не пересчитываем весь слой, только affected interval) и точных индексов (Fenwick, гистограммы). TPL Dataflow не умеет ни того, ни другого из коробки. Если вам нужно и то, и другое — поставьте PhiFlow внутрь блока Dataflow.
impwx
Не нашел ни в одном примере, как задаются связи между элементами слоя и между слоями
arhip1986 Автор
Связи жёстко зашиты в DefaultPhi.FromPrevLayer: каждый узел зависит от [i, i-1, i+1, i+2] предыдущего слоя (кольцевое замыкание). Это не конфигурируется через API, но можно реализовать свой IPhi с любой логикой. Однако если вы меняете радиус зависимости, интервальный пропагатор может дать неоптимальный affected set (он зашит на расширение 2 влево, 1 вправо). Для произвольных графов PhiFlow, скорее всего, не подойдёт — это библиотека для слоистых stencil-пайплайнов.
impwx
А объясните тогда, пожалуйста, пример 2 из области GameDev, где провинции и налоги, как это вообще работает? Доход провинции 1 зависит от налога провинций 1, 2, 3, 4? Как именно зависит, с какими коэффициентами?
arhip1986 Автор
В текущей реализации
DefaultPhiзависимости детерминированные и не имеют настраиваемых коэффициентов. В примере с провинциями коэффициенты были бы единичными (все влияния равны).Как это работает в коде
Напомню, что
DefaultPhi.FromPrevLayerвычисляет значение узла так:Что это значит для примера «провинции и налоги»
Пусть у нас 5 провинций, расположенных кольцом (провинция 0 соседствует с 4 и 1).
Провинции: [0] [1] [2] [3] [4]
Входной слой (layer 0) — налог в каждой провинции. Значения от 0 до domainSize-1 (например, 0..1000).
Выходной слой (layer 1) — доход провинции. Доход провинции i вычисляется из налогов провинций i, i-1, i+1, i+2.
То есть:
Доход(провинция 2) = F( налог(2), налог(1), налог(3), налог(4) )
Функция F — не взвешенная сумма, а хеширующая функция со следующими свойствами:
Детерминированная — одинаковый вход всегда даёт одинаковый выход.
Чувствительная к изменениям — меняется любой налог → меняется доход.
Принимает значения только в диапазоне [0..domainSize-1].
Имеет параметр kWork — количество дополнительных раундов перемешивания (чем больше, тем «сложнее» зависимость).
Почему не обычная формула вроде налог 0.7 + налог_соседа 0.3?
Потому что PhiFlow — не калькулятор линейных уравнений. Она решает другую задачу:
Нужно детерминированное отображение с предсказуемым распределением выходов.
Коэффициенты не нужны, потому что значения всё равно дискретны и домен ограничен.
Хеширующая функция даёт хорошее рассеивание — даже маленькое изменение на входе может дать любое значение на выходе. Это полезно для симуляций и тестирования, когда не нужна точная экономическая модель, а нужна быстрая цепочка вычислений.
Как сделать свой пример с коэффициентами?
Реализовать кастомный
IPhi:Использование:
Что происходит с интервальным пропагатором?
Если вы изменили радиус зависимости (например, убрали зависимость от
i+2), интервальный пропагаторIntervalPropagatorвсё равно будет предполагать расширение 2 влево и 1 вправо, потому что он зашит на топологиюDefaultPhi. Это значит:ApplyInputUpdatesможет пересчитывать больше узлов, чем реально нужно (субоптимально, но корректно).Если вы хотите точного соответствия, нужно либо использовать полный пересчёт (
RecomputeAll), либо переопределять пропагатор.Пример с налогами в статье иллюстрирует класс задач, где PhiFlow применима:
Есть много узлов (провинций).
Есть много слоёв (доход → счастье → производительность → сила армии).
Приходит одно изменение (налог подняли).
Нужно быстро пересчитать всё и ответить на запросы (тотальная сила армии, топ-3 самых сильных провинций).
Конкретные коэффициенты и формула — на разработчике игры. PhiFlow даёт инструмент для быстрого инкрементального пересчёта и готовые индексы для запросов. Саму экономическую модель можно написать в кастомном
IPhi.impwx
Позовите пожалуйста оператора - я хочу пообщаться с живым человеком, а не с ллмкой.
В каком мире доход провинции может зависеть от налогов ее соседей??? С одинаковыми коэффициентами между всеми элементами в кольце и на всех слоях? Это абсурд, который нейросеть видимо сначала нагаллюцинировала, а теперь продолжает на голубом глазу расписывать длинными предложениями с примерами. Соответственно, остальные примеры также могут быть натягиванием совы на глобус.
arhip1986 Автор
да, увидел в коде stencil
[i, i-1, i+1, i+2]и притянул к нему первую попавшуюся игровую метафору, пример был неудачнымP.S. Спасибо что обратили внимание, статью отредактировал, добавил корректный пример - 6 слоёв: заражение в городе → риск для соседних городов → плотность заболевших → нагрузка на больницы → дефицит ресурсов → уровень паники.