Как правильно использовать возможности параллельного программирования?
Зачем программистам математика и зачем знать алгоритмы?

Представьте что у вас есть 10 задач. Каждая пронумерована от 1 до 10, а так же каждая задача выполняется секунд, равным номеру задачи. 1я задача - 1 секунда, 10я задача - 10 секунд - она самая "тяжелая".

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Выполняя задачи последовательно, мы получим выполнение всех за 55 секунд. Как мы узнали это число? Есть несколько способов.

  1. Сложив все вместе( 1+2+3...+10). Можно использовать цикл fori. Еще это называется "Сумма арифметической прогрессии".

  2. Использовать формулу (n * (n+1)) / 2 (Это к вопросу, "зачем нам нужна математика?"). Она же с побитовым сдвигом (n * (n+1)) >> 1

  3. Так же мы можем заметить, что если сложить первое и последнее числа, мы получим 11. 1 + 10 = 11. Далее 2+9 тоже будет 11, и так до 5+6=11. То есть 11+11+11+11+11 = 11*(10/2) = 11*5 = 55.

Последнее - это одно из решений на Leetcode для нескольких задач, где мы используем алгоритм под названием "Two pointers" или "Два указателя".

Используем многопоточность, что бы ускорить выполнение нашей задачи. У нас есть 5 ядер по одному потоку на каждый. Каждая задача изолирована друг от друга и мы имеем три подхода к решению данной задачи.

  1. Поделим количество задач на количество ядер, и выполним задачи в том порядке, в котором они расположены сейчас. То есть мы получим по 2 задачи на ядро, которые будут выполняться последовательно.

  2. Переиспользуем наши потоки. Освободившиеся потоки будут брать свободные задачи так же расположенные по порядку(Аналог пула).

  3. Как и в первом варианте, поделим задачи между ядрами, но теперь одинаково распределим сложность задач для каждого ядра, используя алгоритм "Two pointers"(Это к вопросу "Зачем программистам знать алгоритмы?").

Итак, к результатам!

Результаты будут под теми же номерами, что и решения:

  1. Мы поделили задачи между ядрами последовательно и получилось, что на последнем ядре выполняются самые сложные задачи по 9 и 10 секунд, что в сумме 19. Остальные ядра уже закончили свои задачи, и как итог - это решение равняется сумме самых сложных задач на ядро - 19 секунд.

  2. Здесь 5е ядро, после работы в 5 секунд, заберет себе задачу на 10 секунд, итого, задача займет 15 секунд. Неплохо.

  3. 5 ядер будут выполнять задачи по 11 секунд (10 + 1, 9 + 2, 8 + 3, 7 + 4, 6 + 5). Как итог - все выполнение займет 11 секунд. Лучший результат!

Очень важная мысль из книги "Java concurrency in action"!
"Реальная отдача от разделения рабочей нагрузки программы на задачи достигается при наличии большого числа независимых, однородных задач, которые могут обрабатываться конкурентно."

Обычно мы не имеем дел с "Суммой арифметической прогрессии", где все задачи имеют уникальное время последовательной продолжительности, а, вероятнее, имеем задачи повторяющейся продолжительности. Эти задачи будут так же отсортированы по сложности выполнения.

[1, 3, 4, 4, 6, 7, 7, 8, 9, 10]

Здесь нам не помогут предыдущие формулы, но общее количество времени на последовательное исполнение мы посчитать можем. 59 секунд.

Результаты в каждом из подходов:

  1. 19 секунд.

  2. 16 секунд.

  3. 13 секунд.

Опять побеждает решение с равномерным распределением задач по сложности выполнения.

Давайте выразим это в коде. Для удобства я так же использую библиотеку Lombok.

Представьте. Мы работаем в инвестиционной компании, которая недавно открылась. Нам присылают большой пакет необработанных данных, в котором содержатся инвесторы и инвестиции, которые они инвестировали в разные компании в течении последних 10 лет.

Наша задача состоит в том, что бы получить эти данные и отправить их обрабатываться в другой сервис со своей бизнес логикой. Сейчас, возможно, звучит непонятно, но дальше все станет ясно.

Оставляю ссылку на гитхаб со всем написанным кодом. Так что если не хочется все писать самостоятельно, можете просто скачать здесь.

Для начала создадим три модели данных. Investor, Investment, ExecutionMethod. Последняя нужна просто для удобства.

@Data
public class ExecutionMethod { 
  private String name; 
  private long time; 
  private String description;
  public ExecutionMethod(String name, String description) {
      this.name = name;
      this.description = description;
  }
  
  public void represent() {
      System.out.println(name + ". " + description); // TODO Delete
  }
  
  public void showExecutionTime() {
      System.out.println(name + " " + time + " ms."); // TODO Delete
  }
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Investor { 
  private String name; 
  private List investments;

}

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Investment { 
  private String from; 
  private String to; 
  private Long price; 
}

Так же нам понадобятся два класса помощника. BusinessLogic (будет симулировать бизнес-логику) и MyPhaser (Поможет нам с синхронизацией).

public class BusinessLogic {
  public static void handle(List<Investor> investors) {
      try {
          System.out.println("Инвесторов: " + investors.size() + " с порядком инвестиций: " + investors.stream().map(x -> x.getInvestments().size()).collect(Collectors.toList())); // TODO Delete
          for (Investor investor : investors)
              for (int j = 0; j < investor.getInvestments().size(); j++)
                  Thread.sleep(1);
      } catch (InterruptedException e) {
          e.printStackTrace();
          throw new RuntimeException(e);
      }
  }
  
  public static List<Investor> getUnhandledInvestors(int investorsNum, int investmentNum) {
      List<Investor> investors = new ArrayList<>();
      for (int i = 0; i < investorsNum; i++) {
          List<Investment> investments = new ArrayList<>();
          Investor investor = new Investor("Some Investor", investments);
          int investmentsNumber = (int) ((Math.random()) * investmentNum);
          for (int j = 0; j < investmentsNumber; j++)
              investments.add(new Investment("Good company", "somebody", 1000L));
          investors.add(investor);
      }
      return investors;
  }
}
public class MyPhaser { 
  private static Phaser phaser = new Phaser();
  
  public static void awaitAdvance() {
      phaser.arriveAndAwaitAdvance();
  }
  
  public static void deregister() {
      phaser.arriveAndDeregister();
  }
  
  public static void register() {
      if (phaser.isTerminated())
          phaser = new Phaser(0);
      phaser.register();
  }
}

Класс бизнес-логики будет отдавать нам необработанные данные об инвесторах методом getUnhandledInvestors(...), которые мы должны отправить на обработку в метод handle(...).
В методе handle(...) мы сделали задержку 1мс для симуляции какой-либо деятельности.

Для начала попробуем последовательный метод. У нас будет 1024 инвестора, которые совершили от 0 до 120 инвестиций за 10 последних лет. (120 - это 12 месяцев на 10 лет. По одному платежу в месяц.)

Использование фазера, на данный момент, не имеет какой-либо практической ценности, но и не мешает выполнению. Он понадобится нам позже, потому добавим сразу.

public static void main(String[] args) {
    List<Investor> investors = BusinessLogic.getUnhandledInvestors(1024, 120);
    MyPhaser.register();
    ExecutionMethod syncHandle = new ExecutionMethod("Последовательное выполнение со случайным порядком инвестиций", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    
    syncHandle.represent();
    syncHandle.setTime(syncHandle(investors));
    MyPhaser.deregister();
}

private static long syncHandle(List<Investor> investors) {
    long start = System.currentTimeMillis(); // TODO Delete
    BusinessLogic.handle(investors);
    long end = System.currentTimeMillis(); // TODO Delete
    System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
    return (end - start);
}

Не слишком хороший результат. Если возрастет количество инвесторов или инвестиций, ждать придется еще дольше.

Мы знаем что инвесторы и инвестиции изолированы от других инвесторов и инвестиций. Они не связаны друг с другом и никак не пересекаются.

Потому применим ForkJoin Framework для того, чтобы достичь параллельности в наших вычислениях. Главным отличием ForkJoin от многопоточного выполнения является то, что он задействует ядра компьютера, для распараллеливания и распределения задач по ядрам, в то время как многопоточный подход использует метод квантования времени выполнения, а то есть полезное использование простоев одних потоков другими.

Для начала создадим менеджера, который будет управлять нашими задачами. На моем компьютере больше 5 ядер, потому я выбрал 5. Узнать количество процессоров вы можете методом Runtime.getRuntime().availableProcessors().

public class TaskManager { 
  private final List<Investor> tasks; 
  private final int CORES = 5; 
  private final ForkJoinPool pool = new ForkJoinPool(CORES);
  
  public TaskManager(List<Investor> tasks) {
      this.tasks = new ArrayList<>(tasks);
  }
  public void orderedExec(int thresholdSize, int parts) {
      MyPhaser.register();
      pool.submit(new AsyncOrderedTask(tasks, Math.max(100, thresholdSize), parts));
  }
}

В методе orderedExec(...) параметр thresholdSize - отвечает за минимальное количество инвесторов для последовательной обработки.

Параметр parts - показывает на сколько частей нужно делить коллекцию инвесторов.

Создадим задачу, которая будет разбивать инвесторов на одинаковые группы, и отправлять их выполнять нашу бизнес-логику.

public class AsyncOrderedTask extends RecursiveAction { 
  private final List<Investor> investors; 
  private final int threshold; 
  private final int parts;

  public AsyncOrderedTask(List<Investor> investors, int threshold, int parts) {
      this.investors = investors;
      this.threshold = threshold;
      this.parts = parts;
  }
  
  @Override
  protected void compute() {
      try {
          if (investors.size() < threshold)
              someBusinessLogic();
          else
              divide();
      } finally {
          MyPhaser.deregister();
      }
  }

  private void someBusinessLogic() {
      BusinessLogic.handle(investors);
  }

  private void divide() {
      int size = investors.size(), remains = size % parts, step = size / parts + remains;
      System.out.printf("All size = %d | Step = %d | Parts = %d\n", size, step, parts); // TODO Delete
      for (int i = 0; i < size; i += step) {
          MyPhaser.register();
          int max = Math.min(size, i + step);
          new AsyncOrderedTask(investors.subList(i, max), threshold, parts).fork();
      }
  }
}

Разберем метод divide(). Для начала мы вычисляем шаг цикла (step). В цикле fori мы регистрируем нового участника фазера и создаем задачу на выполнение.

remains - это остаток от деления, для равномерного распределения значений по ядрам. К примеру 1000 на 5 мы легко поделим, получится по 200 инвесторов на ядро.

Тем не менее 1024 инвесторов мы без остатка поделить не сможем.

Если мы не учтем остаток, то после выполнения первым ядром своей задачи в 204 инвестора, оно возьмет еще 4х остаточных инвесторов на выполнение.

Разбивка без учета остатка будет выглядеть следующим образом:
1 ядро. 204 инвестора
2 ядро. 204 инвестора
3 ядро. 204 инвестора
4 ядро. 204 инвестора
5 ядро. 204 инвестора
1 ядро. 4 инвестора

С учетом же остатка, ядра отработают в следующем порядке.
1 ядро. 208 инвестора
2 ядро. 208 инвестора
3 ядро. 208 инвестора
4 ядро. 208 инвестора
5 ядро. 192 инвестора

Гораздо лучше! Так же вы могли заметить строку "int max = Math.min(size, i + step);". Это для того, что бы не выйти за границы коллекции. "5 ядро. 192 инвестора" как раз нам это демонстрирует.

С разбором закончили, переходим к тестам!

Далее мы больше не будем проверять последовательный подход, потому просто его уберем.

public static void main(String[] args) {
    List<Investor> investors = BusinessLogic.getUnhandledInvestors(1024, 120);
    MyPhaser.register();
    ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");

    commonMethodRandom.represent();
    commonMethodRandom.setTime(commonMethod(investors, 5));
    MyPhaser.deregister();
} 

private static long commonMethod(List<Investor> investors, int parts) {
    TaskManager manager = new TaskManager(investors);
    long start = System.currentTimeMillis();
    manager.orderedExec(investors.size()  / 4, parts);
    MyPhaser.awaitAdvance();
    long end = System.currentTimeMillis();
    System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
    return (end - start);
}

Совсем не плохо! Благодаря случайной последовательности мы получаем практически равномерно-распределенное количество инвестиций. Давайте посмотрим, что будет, если мы получим последовательные данные. Для этого добавим в метод main некоторый код.

public static void main(String[] args) {
    List<Investor> investors = BusinessLogic.getUnhandledInvestors(1024, 120);
    MyPhaser.register();
    ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");

    commonMethodRandom.represent();
    commonMethodRandom.setTime(commonMethod(investors, 5));

    investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));

    commonMethodOrdered.represent();
    commonMethodOrdered.setTime(commonMethod(investors, 5));

    MyPhaser.deregister();
} 

Уже не так весело, не правда ли?

Вот здесь нам и пригодится знание алгоритмов. В частности мы будем использовать алгоритм «Two pointers».

Создадим задачу, которая будет использовать данный алгоритм для разбития инвесторов на примерно однородные группы.

public class AsyncTwoPointerTask extends RecursiveAction {
  private final List<Investor> investors;
  private final int threshold; 
  private final int parts; 
  private final boolean asPool;
  
  public AsyncTwoPointerTask(List<Investor> investors, int threshold, int parts, boolean asPool) {
        this.investors = investors;
        this.threshold = threshold;
        this.parts = parts;
        this.asPool = asPool;
  }

  @Override
  protected void compute() {
        try {
            if (investors.size() < threshold)
                someBusinessLogic();
            else
                divide();
        } finally {
            MyPhaser.deregister();
        }
    }

    private void someBusinessLogic() {
        BusinessLogic.handle(investors);
    }

    private void divide() {
        int size = investors.size(), remains = size % parts, step = size / parts, dividedStep = step / 2, leftEnd = 0, leftStart, rightStart = size, rightEnd;
        System.out.printf("All size = %d | Step = %d | Parts = %d\n", size, asPool ? step / 2 : step, asPool ? parts * 2 : parts); // TODO Delete
        while (leftEnd < rightStart) {
            MyPhaser.register();
            leftStart = leftEnd;
            rightEnd = rightStart;
            leftEnd += Math.min(rightEnd - leftStart - dividedStep, dividedStep + remains);
            rightStart -= dividedStep;
            if (asPool) {
                MyPhaser.register();
                new AsyncTwoPointerTask(investors.subList(leftStart, leftEnd), threshold, parts, asPool).fork();
                new AsyncTwoPointerTask(investors.subList(rightStart, rightEnd), threshold, parts, asPool).fork();
            } else
                new AsyncTwoPointerTask(
                        Stream.of(investors.subList(leftStart, leftEnd), investors.subList(rightStart, rightEnd))
                                .flatMap(Collection::stream).collect(Collectors.toList()), threshold, parts, asPool).fork();
        }
    }
}

Метод divide() теперь сужается к центру. Слева он собирает в коллекцию инвесторов с самым маленьким количеством инвестиций, справа - с самым большим.

Потом объединяет их и отдает на исполнение. Таким образом мы достигаем однородности задач на ядро.

Вспомните пример с вычислением первого и последнего числа в примере с «Суммой арифметической прогрессии», о чем говорили ранее. 11+11+11+11+11=55.

Так же претерпели изменения некоторые переменные, и появилась переменная asPool, которую мы вызываем из вне. Таким образом мы делаем подобие пула потоков, где после того как поток заканчивает выполнение одной задачи - берет другую (Переиспользование потоков в пуле). Для того, чтобы симулировать пул в классе AsyncOrderedTask, достаточно увеличить в классе main количество частей, на сколько мы хотим разбить коллекцию инвесторов. Эти методы будут доступны в полном коде, в конце статьи, разбирать мы их не будем.

Добавим метод в TaskManager.

public void twoPointersExec(int thresholdSize, boolean asPool) {
    MyPhaser.register();
    pool.submit(new AsyncTwoPointerTask(tasks, Math.max(100, thresholdSize), CORES, asPool));
}

Теперь перейдем в метод main и поправим его следующим образом:

public static void main(String[] args) {
    List<Investor> investors = BusinessLogic.getUnhandledInvestors(1024, 120);
    MyPhaser.register();
    ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
    ExecutionMethod twoPointerExecutionRandom = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod twoPointerExecutionOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [1,2,3,4,5,6,7,8,9,10]");

    commonMethodRandom.represent();
    commonMethodRandom.setTime(commonMethod(investors, 5));

    twoPointerExecutionRandom.represent();
    twoPointerExecutionRandom.setTime(twoPointerExecution(investors, false));

    investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));

    commonMethodOrdered.represent();
    commonMethodOrdered.setTime(commonMethod(investors, 5));

    twoPointerExecutionOrdered.represent();
    twoPointerExecutionOrdered.setTime(twoPointerExecution(investors, false));

    MyPhaser.deregister();
} 

private static long twoPointerExecution(List<Investor> investors, boolean asPool) {
    TaskManager manager = new TaskManager(investors);
    long start = System.currentTimeMillis();
    manager.twoPointersExec(investors.size() / 4, asPool);
    MyPhaser.awaitAdvance();
    long end = System.currentTimeMillis();
    System.out.println("Result: " + (end - start) + " ms\n"); // TODO Delete
    return (end - start);
}

Мы видим что при случайном порядке инвестиций, при использовании алгоритма «Two pointers», практически нет никакого выигрыша по отношению к обычному последовательному вычислению.

Иногда он может даже проигрывать в скорости, но не значительно. Строго говоря, однородность задач, при случайном порядке инвестиций, мы так же получаем случайно. Потому абсолютно однородными их назвать сложно.

Однако при отсортированном порядке инвестиций мы видим либо, практически, неизменность либо увеличение скорости вычисления.

Обычное последовательное вычисление здесь проигрывает полностью.

Получается что преимуществом однородных задач является не только скорость, но и неизменность скорости вычисления, независимо от порядка и «тяжести» входящих данных.

Мы раскрыли преимущество создания однородных задач, используя математику и алгоритмы!

Так же мы рассматривали варианты использования подобия пула. Для этого поменяем метод main следующим образом.

public static void main(String[] args) {
    List<Investor> investors = BusinessLogic.getUnhandledInvestors(1024, 120);
    MyPhaser.register();
    ExecutionMethod syncHandle = new ExecutionMethod("Последовательное выполнение со случайным порядком инвестиций", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod commonMethodRandom = new ExecutionMethod("Метод со случайным порядком инвестиций (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod commonMethodRandomPool = new ExecutionMethod("Метод со случайным порядком инвестиций по типу пула (Практически равномерное распределение)", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod twoPointerExecutionRandom = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod twoPointerExecutionRandomPool = new ExecutionMethod("Метод со случайным порядком инвестиций и применением алгоритма Two Pointers по типу пула", "Пример: [4,1,10,6,2,9,3,8,7,5]");
    ExecutionMethod commonMethodOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций (Худший вариант)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
    ExecutionMethod commonMethodOrderedPool = new ExecutionMethod("Метод с отсортированным порядком инвестиций по типу пула (Худший вариант с пулом)", "Пример: [1,2,3,4,5,6,7,8,9,10]");
    ExecutionMethod twoPointerExecutionOrdered = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers", "Пример: [1,2,3,4,5,6,7,8,9,10]");
    ExecutionMethod twoPointerExecutionOrderedPool = new ExecutionMethod("Метод с отсортированным порядком инвестиций и применением алгоритма Two Pointers по типу пула", "Пример: [1,2,3,4,5,6,7,8,9,10]");

    syncHandle.represent();
    syncHandle.setTime(syncHandle(investors));

    commonMethodRandom.represent();
    commonMethodRandom.setTime(commonMethod(investors, 5));

    commonMethodRandomPool.represent();
    commonMethodRandomPool.setTime(commonMethod(investors, 10));

    twoPointerExecutionRandom.represent();
    twoPointerExecutionRandom.setTime(twoPointerExecution(investors, false));

    twoPointerExecutionRandomPool.represent();
    twoPointerExecutionRandomPool.setTime(twoPointerExecution(investors, true));

    investors.sort(Comparator.comparingInt(dividedStep -> dividedStep.getInvestments().size()));

    commonMethodOrdered.represent();
    commonMethodOrdered.setTime(commonMethod(investors, 5));

    commonMethodOrderedPool.represent();
    commonMethodOrderedPool.setTime(commonMethod(investors, 10));

    twoPointerExecutionOrdered.represent();
    twoPointerExecutionOrdered.setTime(twoPointerExecution(investors, false));

    twoPointerExecutionOrderedPool.represent();
    twoPointerExecutionOrderedPool.setTime(twoPointerExecution(investors, true));

    System.out.println("Результаты:"); // TODO Delete
    commonMethodRandom.showExecutionTime();
    twoPointerExecutionRandom.showExecutionTime();
    twoPointerExecutionRandomPool.showExecutionTime();
    commonMethodOrdered.showExecutionTime();
    commonMethodOrderedPool.showExecutionTime();
    twoPointerExecutionOrdered.showExecutionTime();
    twoPointerExecutionOrderedPool.showExecutionTime();

    System.out.println("\nНаш победитель:"); // TODO Delete
    ExecutionMethod method = Stream.of(commonMethodRandom,
            twoPointerExecutionRandom,
            twoPointerExecutionRandomPool,
            commonMethodOrdered,
            commonMethodOrderedPool,
            twoPointerExecutionOrdered,
            twoPointerExecutionOrderedPool).min(Comparator.comparingLong(ExecutionMethod::getTime))
            .orElse(new ExecutionMethod("Пустой", "Пустой"));
    method.showExecutionTime();
    MyPhaser.deregister();
}

А результаты вы уже сами посмотрите :-)

Комментарии (15)


  1. SpiderEkb
    08.12.2023 05:38
    +4

    Не очень понятен вот такой момент:

    С одной стороны

    Представьте что у вас есть 10 задач. Каждая пронумерована от 1 до 10, а так же каждая задача выполняется секунд, равным номеру задачи. 1я задача - 1 секунда, 10я задача - 10 секунд - она самая "тяжелая".

    Т.е. задачи не являются однородными.

    А далее вы начинает рассуждать о

    Представьте. Мы работаем в инвестиционной компании, которая недавно открылась. Нам присылают большой пакет необработанных данных, в котором содержатся инвесторы и инвестиции, которые они инвестировали в разные компании в течении последних 10 лет.

    Наша задача состоит в том, что бы получить эти данные и отправить их обрабатываться в другой сервис со своей бизнес логикой.

    Здесь уже речь идет об однородной обработке - каждая инвестиция от каждого инвестора обрабатывается одинаковым образом и независимо от остальных. О чем Вы явно говорите ниже:

    Мы знаем что инвесторы и инвестиции изолированы от других инвесторов и инвестиций. Они не связаны друг с другом и никак не пересекаются.

    Т.е. здесь неожиданно перешли к набору однородных задач.

    Дальше мне трудно комментировать - это какая-то специфика джавы, на мой взгляд неоправданно усложненная. Немного расскажу как у нас. Не джава, нормальный компилируемый язык. Сервер на процессорах IBM Power E980. В сумме там 120 процессорных ядер Power9 (точно не скажу сколько там процессоров и сколько в каждом ядер). И каждое ядро работает в 8 потоков (SMT8).

    Для распараллеливания мы не используем потоки (threads), вместо это каждая копия обработчика (а они все одинаковые - см. выше) запускается в отдельном, полностью изолированном от остальных, фоновом задании (batch job). Это и безопасно - даже полное падение одного задания никак не затрагивает все остальное, что работает на сервере (а там еще десятки тысяч других заданий в этот момент крутятся и свои бизнес-процессы обслуживают), и удобнее для сопровождения - они могут мониторить каждое задание, видеть как оно потребляет ресурсы, при необходимости остановить или даже прибить задание (и это опять никак не повлияет на остальные).

    Т.е. каждый обработчик запускается в отдельном фоновом задании в своем окружении. В С это делается функцией spawn, в нашей системе есть команда SBMJOB (Submit Job).

    На каком процессоре и в каком ядре будет выполняться тот или иной обработчик нас не интересует - этим занимается система. Тем более, что наша задача далеко не единственная, которая в данный момент работает на сервере и вопрос надо решать глобально, для всей системы в целом, а не для одной конкретной задачи. Поэтому все пляски с ForkJoin нам не нужны - система сделает это за нас лучше.

    А дальше так.

    • Есть одно головное задание - master. Запускаем его. Его задачи: создать конвейер (об этом ниже), запустить нужное количество копий (фоновых заданий, batch job) обработчиков (worker) и заняться своим прямым делом - отбором данных (в нашем случае данные могут формироваться в результате выполнения достаточно сложного, а иногда и не одного а нескольких последовательно SQL запросов по нескольким таблицам с кучей условий), упаковкой данных в пакеты (пакет - набор из нескольких - 10, 100, 1000 - элементов выборки) и выкладкой пакетов на конвейер. По окончании выборки master выкладывает на конвейер нужное количество пустых пакетов - терминаторов (по количеству обработчиков).

    • Есть конвейер. Это механизм передачи данных из одного изолированного задания в другое. Например, это может быть пайп (pipe). На нашей платформе есть более удобные механизмы - очереди данных (Data Queue и User Queue). При чтении очередного пакета он автоматически удаляется с конвейера. Совместный доступ к конвейеру (запись со стороны master'а и чтение со стороны worker'ов) обеспечивается системой - тут тоже нам не надо напрягаться.

    • Есть обработчик - worker. Он берет очередной пакет с конвейера, обрабатывает содержащиеся в нем данные и идет за следующим пакетом. И так пока не закончатся данные - как только worker прочитал пакет-терминатор, он просто завершает работу (с закрытием своего фонового задания).

    Особенности тут такие:

    • Скорость конвейера в общем случае не критична т.к. время обработки пакета всегда выше времени его формирования, выкладки на конвейере и чтения с конвейера.

    • На больших объемах данных равномерность загрузки обработчиков достигается автоматически - как пример: 50 000 000 элементов выборки (500 000 пакетов по 100 элементов), 10 обработчиков стартуют одновременно и завершают работу в интервале 200-300мсек при общем времени работы в несколько часов. При этом количество обработанных пакетов для каждого обработчика разное т.к. время обработки для каждого отдельного элемента может варьироваться.

    • Если конвейер контролируемый (в нашем случае для очередей данных есть операция "материализации очереди", которая дает среди прочей информации текущее количество элементов в очереди), возможен контроль состояния конвейера и динамическая балансировка - если скорость выкладки на конвейер сильно выше скорости разбора - не хватает обработчиков, можно запустить еще одну или несколько копий. И наоборот - скорость разбора выше скорости выкладки (обработчики "голодают") - обработчиков больше чем нужно, можем послать одному или нескольким команду на завершение, уменьшив тем самым общую нагрузку на систему.

    Таких задач, где требуется независимая параллельная обработка нескольких десятков миллионов записей, у нас достаточно много. Есть т.н. "скелетон" - готовый шаблон где разработчику фактически нужно только описать структуру одного элемента в пакете, написать процедуру формирования пакета (с отбором данных) для master-задания и процедуру обработки элемента для worker. Ну и собрать все это подключив модули где реализована вся "многопоточка" - запуск обработчиков, контроль их состояния, работа с конвейером и т.п.


    1. Borlok Автор
      08.12.2023 05:38

      Здравствуйте! Спасибо, что так объемно описали механизм работы, который используете.

      Каждая инвестиция от каждого инвестора обрабатывается одинаковым образом

      Тут вы несомненно правы. В задачах же мы обрабатываем именно инвесторов, а тяжестью, или неоднородностью, является именно количество инвестиций на одного инвестора. То есть когда мы берем инвестора с 0 инвестиций, он обрабатывается мгновенно. Когда берем инвестора со 120 инвестициями, он обрабатывается примерно за 120 мс.

      Думаю у нас несостыковка понятий) Поправьте, если ошибаюсь, но похоже что заданиями или задачами у вас называется то, что у нас называется репликами микросервисов. Это некоторое подобие копий мини серверов, которые могут общаться друг с другом(могут и не общаться), работают изолированно, могут иметь собственную БД и выполняют узкоспециализированные задачи. Очередями выступают Kafka, RabbitMQ и им подобные.

      Задачами в статье я называю RecursiveTask - классы из ForkJoin Framework в Java, в которых мы описываем задачу на выполнение, если она подходит под условия, если же не подходит, она делится на подзадачи.

      P.S. У вас впечатляющие объемы данным)


      1. SpiderEkb
        08.12.2023 05:38

        Тут вы несомненно правы. В задачах же мы обрабатываем именно инвесторов, а тяжестью, или неоднородностью, является именно количество инвестиций на одного инвестора. То есть когда мы берем инвестора с 0 инвестиций, он обрабатывается мгновенно. Когда берем инвестора со 120 инвестициями, он обрабатывается примерно за 120 мс.

        Тогда извиняюсь - я просто не так Вас понял.

        Действительно (у нас также бывает) - каждый элемент обрабатывается по одному алгоритму, но время, потребное на его обработку, может варьироваться в силу разного объема неких связанных с ним данных.

        В нашем подходе (на больших объемах) это не играет никакой роли - все это нивелируется. Одному обработчику достанется пакет со "сложными" элементами, другому - с "простыми". Но поскольку пакетов сотни тысяч, а обработчиков (обычно) 5 - 10, то в результате получается что один обработчик обработал 10 000 "сложных" пакетов, второй за это же время - 30 000 "простых", третий 20 000 "средних" и т.п. (цифры весьма условные, конечно же).

        Поправьте, если ошибаюсь, но похоже что заданиями или задачами у вас называется то, что у нас называется репликами микросервисов. Это некоторое подобие копий мини серверов, которые могут общаться друг с другом(могут и не общаться), работают изолированно, могут иметь собственную БД и выполняют узкоспециализированные задачи. Очередями выступают Kafka, RabbitMQ и им подобные.

        Все не так. Речь идет о банке (то, что крутится на центральных серверах - автоматизированная банковская система, вся внутренняя логика банка) Мы работаем на платформе IBM i (основной язык - RPG, но не суть, что-то, в частности ядро вот этой самой "многопоточной обработки", пишется и на С/С++).

        Это еще не мейнфреймы, IBM позиционирует эту платформу как middleware - изначально это были коммерческие сервера для малого и среднего бизнеса, но получилось очень мощно и масштабируемо и сейчас и в крупный бизнес из использует успешно. Но принципы работы тут как на "больших" машинах. Вся работа - через эмулятор терминала IBM5250 (когда-то давно это были реальные аппаратные терминалы, сейчас - программные эмуляторы, например, в составе пакета IBM i Access Client Solutions)

        Все, что тут происходит, происходит в рамках какого-то задания (job). Ну некий аналог "процесса" в win или linux, но пошире.

        Подключились терминалом - запустилось интерактивное задание (подсистема QINTER)

        Если вы запустите программу (команда CALL ...), то она запустится в этом же задании - тут вы можете увидеть как-то вывод от программы ну и, понятно, не сможете в этом задании ничего делать пока не завершится ваша программа. Если она из себя вызовет другую программу, то та тоже будет работать в этом же задании и ваша программа будет ждать пока не завершится вызванная. Короче, в рамках задания работа синхронная.

        Но мы можем запустить программу и параллельно, в фоновом (подсистема BATCH) задании. Для этого есть системная команда SMBJOB. В терминале так:

        В коде примерно так:

        cmdSbmJob = 'SBMJOB CMD(CALL PGM(' + cPgmName + ') PARM(''D'')) JOB(' + JobNam +')';
        CmdExc(cmdSbmJob : %len(%trim(cmdSbmJob)));

        Или (в С/С++) используем spawn

        m_pWorkers[i].id = spawn(m_WrkPgmPath, 0, NULL, &inherit, argv, envp);

        что дает ровно тот же результат на этой платформе.

        Т.е. все, что тут работает, оно всегда работает в каком-то задании. Задания полностью изолированы друг от друга. У каждого задания своя память, свой joblog, своя очередь сообщений... Могут быть свои права доступа к каким-то объектам. Вплоть до того, что можно в задании запрещать или разрешать использовать потоки (у нас поддерживаются posix threads). Если запрещено, попытка запустить поток (pthread_create) вернет ошибку

        [EBUSY] The system cannot allow thread creation in this process at this time.

        Естественно, что управление заданиями полностью лежит на системе. На каком процессоре, в каком ядре оно будет выполняться - за все это отвечает система и мешать в этом ей не надо - одновременно на сервере работает тысячи, если не десятки тысяч заданий помимо вашего.

        Что касается очередей. Это не Кафка, не Кролик. Хотя Кафка у нас, вроде как есть (помимо оригинальной IBM MQ). Но в данном случае речь не о них - они для другого. Тут речь о системных объектах *DTAQ (Data Queue) и *USRQ (User Queue). Они достаточно похожи по принципам работы. Фактически это список - он может быть FIFO, LIFO или KEYED (когда каждый блоке данных снабжается ключом и можно брать не "первый доступный", а "первый, подходящий под условие для заданного значения ключа").

        *DTAQ более мощная в плане возможностей (в частности, ее содержимое сохраняется на диске), *USRQ попроще (на диске хранится только описание объекта, все содержимое только в памяти), но в 3-4 раза быстрее и, главное и для нас существенное, она в 3-4 раза меньше ресурсов потребляет. Для работы с *DTAQ есть системные API, работа с *USRQ только программно - на низком уровне через "машинные инструкции" (MI). Впрочем, эту проблему я решил у нас - сделал достаточно полный USRQ API (включая команды для работы с очередью в терминале, SQL интерфейсы для работы с очередью, ну и сам API для простой работы из С/С++ или RPG).

        Так что в качестве конвейера мы используем или обычные пайпы (универсально - есть на любой платформе) или *USRQ. (более удобно, но специфично для нашей платформы).

        Но суть одна - и master и worker'ы подключаются к одному конвейеру, master туда кладет пакеты (write), worker'ы разбирают их оттуда (read). При чтении пакета он автоматически удаляется с конвейера (если специально не делать peek - неудаляющее чтение).

        У вас впечатляющие объемы данным

        Ну у нас клиентов 50млн. А с каждым еще куча всяких данных связано - счета, карты, клиентские данные (документы, адреса и т.п.).

        Вот задача - "ежегодная актуализация клиентов". Нужно отобрать всех клиентов не ЮЛ (ФЛ, самозанятые, ИП... - все у кого есть ДУЛы - документы удостоверяющие личность) и для каждого проверить корректность адресов и сроки окончания действия ДУЛ. Там 5 выборок разных. А по одной из них еще подвыборка - надо проверить не только самого клиента, но и его держателей карт (когда владелец счета выпускает карту не для себя, а для третьего лица). Если все ок, то у клиента обновляется "дата актуализации". Иначе - пишем в отчет кто почему не прошел. В сумме это порядка 25млн клиентов на сегодняшний день. Запускается это раз в год и работает 2+ часа в 10 обработчиков.

        Есть и хуже. Есть "списки комплаенс". Росфинмониторинг регулярно присылает списки всяких злодеев-бармалеев которым надо ограничивать платежи и т.п. Всего 5 типов списков разных. Все они принимаются и раскладываются по нашей БД. Нам нужно проверять - есть ли совпадения наших клиентов с субъектами списков по имени (ФЛ)/наименованию (ЮЛ), ДР (плюсом к имени для ФЛ), ИНН, ДУЛ (для ФЛ), адресам (которых у клиента несколько типов - почтовый, регистрации, фактического пребывания и т.п.)... И это с учетом того, что у субъектов списков может быть несколько имен, несколько ДУЛ и т.п. -

        она же Анна Ефидоренко, она же Элла Кацнельбоген, она же Людмила Огуренкова, она же Изольда Меньшова, она же Валентина Панеяд.

        Итого имеем 50млн клиентов и 300-500 тысяч субъектов. И надо сравнить всех со всеми, если найдены совпадения по данным, занести их в т.н. "стоплист". И не просто добавить, а смотреть - на этого клиента было совпадение с этим субъектом по имени, а сейчас еще и по ДУЛ появилось - надо изменить запись, а для этого совпадений не было, а сейчас появилось - добавляем, для того было, сейчас нет (исключили из списка субъекта) - удаляем запись...

        Делается это или каждый день (потому что клиентские данные меняются и может появится совпадение) или после загрузки очередного списка (изменения в субъектах).

        Решение "в лоб" (тупо "каждый с каждым") - это 12 и более часов в 10 потоков. Что неприемлемо. Поэтому извращаемся как можем - ежедневно проверяем только тех клиентов, у которых есть изменения в данных (что резко сокращает объем выборки). После загрузки списка идет обратно - от субъектов. И проверяем только тех, у кого были изменения в последней версии списка. Что тоже сокращает объемы кратно. Плюс еще ряд хитрых вывертов, позволяющих сократить время обработки до пределов часа. Но тут огромная работа и аналитика и разработчика заложена. И еще ряд смежных задач попутно реализован.


    1. murkin-kot
      08.12.2023 05:38

      Обращал ранее внимание на ваши усилия по обнародованию используемого в вашей фирме подхода. Вы везде упирали на повышенное внимание к эффективности использования вычислительных ресурсов. Но как-то у меня не складывается картина именно эффективности.

      Вы утверждаете, что ради восьми часов работы какого-то алгоритма (скорее всего расчёт начисления процентов) закуплен мощный сервер с очень дорогим железом (120 ядер по 8 потоков каждое, ну и т.д.). Отсюда вопрос - а зачем вам все остальные 30.5*24-8 часов нужен этот сервер? Разве это эффективно?

      Понимаю, что приведён лишь пример, но суть остаётся прежней, пусть у вас 50 млн. клиентов и по ним нужно начислять проценты по ряду продуктов, пусть по 10 продуктов, требующих начислений, на каждого клиента в среднем, итого - 500 млн. ежемесячных начислений, или 8*10=80 часов, или 3-е суток и 8 часов. Что делает сервер всё остальное время?

      Ну и для полного понимания контекста - вы, видимо, один из разработчиков, отвечающих за эти самые начисления. Я правильно понимаю? Это к тому, что, похоже, у вас наблюдается профессиональное искажение в сторону "видеть во всём гвозди", то есть в сторону максимального внимания к производительности, но, в ущерб всему остальному.

      Почему я так думаю? Очень просто - 50М клиентов не могут каждый день выполнять по тысяче операций - они очень быстро устанут. Значит среднее количество операций в день на человека можно (грубо) считать 1-3, пусть будет 2 операции в сутки на каждого в среднем. Получаем 100М операций в сутки, или 100 000 / 86 = 1162 в секунду. Выполнить около тысячи вычислений, обычно достаточно примитивных, вроде суммы или тех же процентов, любой современный процессор может за микросекунды, ну пусть несколько миллисекунд. Поэтому всё упрётся в необходимость прочитать данные, а потом сохранить результат расчёта, что сразу уводит нас в далёкую даль от представленного вами 120-ядерного молотильника для данных. Поэтому и возникает вопрос - как вы можете оправдать наличие явно избыточных мощностей при вашем постоянно подчёркиваемом стремлении к эффективности?


      1. SpiderEkb
        08.12.2023 05:38

        Вы утверждаете, что ради восьми часов работы какого-то алгоритма (скорее всего расчёт начисления процентов) закуплен мощный сервер с очень дорогим железом (120 ядер по 8 потоков каждое, ну и т.д.). Отсюда вопрос - а зачем вам все остальные 30.5*24-8 часов нужен этот сервер? Разве это эффективно?

        Вы неправильно поняли.

        1. Мощный сервер не один. По-моему, из 2 или даже 3 + горячий резерв.

        2. На этих серверах крутится не одна конкретная задача, а вся автоматизированная банковская система (АБС). Т.е. вся банковская логика.

        Т.е. (не самые свежие оценки)

        АБС это:
        - 27 тыс. программных объектов
        - 15 тыс. таблиц и индексов базы данных
        - Более 150 программных комплексов
        - Занимает более 30 Терабайт дискового пространства
        - В день изменяется более 1,5 Терабайт информации в БД
        - За день выполняется более 100 млн. бизнес операций
        - Одновременно обслуживается более 10 тыс. процессов
        - За год происходит более 1100 изменений в алгоритмах программ

        Ежедневно в АБС выполняется процедура закрытия банковского дня (EOD), в которой выполняется обслуживание всех без исключения сделок и бизнес сущностей.

        В процедуре закрытия дня производится более 500 миллионов изменений в БД, при этом eё длительность составляет около 4 часов.

        Это то, что происходит на нашем сервере. Потому он и мощны. Потому мы и убиваемся за эффективность - на нем работает mission critical мастер-система

        А есть еще "внешние системы"

        К АБС подключено более 60 банковских систем, обеспечивающих работу различных бизнес направлений.
        Большинство фронт приложений используют АБС, как основной источник информации и функциональности.

        Которые постоянно "ходят к нам" со всякими запросами. Ткнул пользователь в мобильном приложении "история операций" - к нам прилетел запрос на формирование выписки по карте или по счету - мы должны пошуршать по БД и отдать соотв. набор данных.

        Ткнул "перевод по номеру телефона" - в конечном итоге прилетает к нам - формируется "платежный документ", который потом прогоняется через систему контроля платежей (порядка 10-ка проверок по результатам которых выносится решение о том, пропустить платеж или отправить его в финмон на ручной контроль или авторизацию). За сутки - несколько сотен миллионов платежей проходит через систему расчетов.

        Есть рассылки уведомлений клиентов (типа "у вас через ... дней закончится срок действия паспорта" и т.п.).

        Регулярная обязательная банковская отчетность.

        Уведомления во всякие структуры (открывает клиент счет - уведомление в ФНС и т.п.)

        Приходит человек, становится клиентом, открывает счет, заказывает карту - все это к нам.

        Я даже затруднюсь перечислить все, что происходит на наших центральных серверах - так там всего много.

        При этом никаких облаков. Все только на своих серверах и изолировано от внешнего мира (без прямого доступа к серверам извне).


        1. murkin-kot
          08.12.2023 05:38
          +1

          Спасибо за статистику. Скажу честно - всё равно не вижу потенциала для столь мощного сервера. Надеюсь, что старое правило, гласящее "дурной работой можно загрузить любую машину", не совсем для вас, но я реально не вижу необходимости в 960 потоках, доступных на вашем железе.

          За день выполняется более 100 млн. бизнес операций

          Это относительно объективный показатель. Остальные цифры зависят от субъективно выбранного вами способа решения задач.

          Ещё есть менее объективный, но тем не менее полезный показатель:

          В процедуре закрытия дня производится более 500 миллионов изменений в БД

          Эти показатели вполне укладываются в мои грубые прикидки в сообщении выше - 2 транзакции на клиента в день, плюс, видимо, по каждому клиенту в среднем имеется десяток счетов, по которым нужно начислять проценты, правда непонятно, зачем это делать каждый день. Хотя, возможно, от вас требуют перезапись состояний всех счетов каждый день (например сальдо на день), но тогда это уже несколько маразматичный подход со стороны бизнеса.

          Предполагаю, что всё упирается в чтение-запись на диски, а вычислительная составляющая большую часть времени сильно недогружена. Не пробовали оптимизировать именно ввод-вывод?


          1. SpiderEkb
            08.12.2023 05:38

            Вы просто не представляете себе работу банка. Для вас это только счета-проводки. Мне тоже так когда-то казалось.

            На самом деле это еще огромное количество клиентских данных и работы с ними. Это огромное количество регулярной обязательной отчетности. Это большое количество разного рода уведомлений рассылаемых как клиентам, так и различным органам.

            Я даже затруднюсь рассказать про все что там работает - просто не знаю всего. Но поверьте, это очень много - тут одно изменение какого-то поля в одной таблице может потащить за собой длинную цепочку разных действий - изменения в других таблицах, отсылка каких-то уведомлений и т.д. и т.п. Я вот практически не работаю с денежными операциями (это вообще отдельные команды - Система Расчетов, Тарифный, Депозитный и Лимитный модули, Модуль пластиковых карт, Универсальный кассовый модуль...). Я работаю с клиентскими данными и комплаенсом. И у нас очень много реально тяжелых по ресурсам задач, требующих обработки больших объемов данных (про сверку я писал уже тут - ежедневная операция).

            И " транзакции в среднем на клиента в день" - это оценка ниже нижнего. Потому что много клиентов ЮЛ, в т.ч. и крупных, у которых десятки счетов и по ни сотни транзакций за сутки гоняется. Т.е. "сотни миллионов транзакций" - это не 100 миллионов, а может и 500-600. И каждая проходит через цепочку проверок.

            И это ежедневное сведение баланса - тот самый EOD.

            И это высокие требования к надежности всего этого. Время недостуности системы нормируется минутами. Дальше - санкции и штрафы от регулятора.

            И да. В нормальных условиях сервер нагружен где-то на 50-60%. Потому что есть периоды пиковых нагрузок (в частности перед НГ), когда загрузка сервера достигает 90%. И с середины декабря и по середину января действует мораторий на внедрение новых поставок. Потому что не дай бог что.


            1. murkin-kot
              08.12.2023 05:38

              Вы перечислили логические сущности, но не вдавались в подробности именно вычислительных задач.

              Вообще, именно так все вокруг и приходят к микросервисам и прочим небыстрым решениям. Но от вас я ожидал каких-то чудес, ведь вы очень сильно упирали на производительность кода. Тем не менее вижу, что и вы тоже быстро переходите на логические единицы (или бизнес-задачи), что бы упростить себе жизнь.

              Это не критика, потому что понятно - так намного проще работать. Вы тоже пришли к этому выводу, хоть и не согласились с ним. То есть вывод вы не озвучили, но фактически пошли тем же путём, которым идут все сторонники микросервисов. Получается - имеет место неосмысленное принятие критикуемого вами подхода. Вот такая занимательная ситуация.


              1. SpiderEkb
                08.12.2023 05:38

                Скажите, вы с IBM i работали? Как оно устроено представляете? С банковским сервером работали?

                Вообще, то, что вы писали хоть когда-нибудь проходило нагрузочное тестирование с претензиями типа

                Параметры вызовов использованы из промышленного USPLOGPF ( USPVAL like '%U01%'. ), в колл-ве 34 тыс.

                PBC01U01R PEX stat

                Из PEX статистики работы PBC01U01R видно, что 33% времени и 36% ресурсов CPU тратится на выполнение QSQRPARS в программе STL#CHKN, т.е. парсинг статических выражений при подготовке SQL запроса,

                Сократить данные русурсозатраты практически до нуля можно путем описания параметров sql запросов через SQL Descriptor Area (SQLDA).

                Поскольку CU130 один из наиболее активно используемых сервис модулей, необоснованное повышенное ресурсопотребление является малодопустимым. Просьба инициировать доработку STL#CHKN.

                вам заворачивали поставку на доработку?

                У нас НТ на копии промсреды обязательно для каждой поставки. Это к вопросу эффективности кода.

                Если хотите каких-то чудес, то сначала почитайте вот это: Ф.Солтис Основы AS/400 Просто чтобы понимать на чем и как все это работает. Если что, банк на этом работает где-то года с 99-го. И уже не одно поколение серверов сменилось (точно знаю что когда-то, еще до меня, были Power7, я пришел уже на Power8, сейчас Power9). И, уж поверьте, деньги тут считать умеют и любое решение рассматривается прежде все с точки зрения "сколько на этом заработает банк".

                И, поверьте, микросервисы - это всего лишь модный тренд. И не надо пихать из везде и всюду. Есть и другие подходы. Например, модель акторов (элементы которой у нас присутствуют в том числе).

                И, поверьте, скорость введения новых фич на нашем уровне не волнует никого. Волнует производительность (в т.ч. устойчивость к пиковым нагрузкам) и надежность.

                То что мы мыслим логическими задачами - ну так работает банк. Это не какой-то один процесс, их тысячи и десятки тысяч. И все одни разнородны. И за каждый процесс отвечает како-то модуль или несколько модулей. Пытаться описать как все это работает - слишком долго и скучно. Здесь десятки команд по разным направлениям. Комплаенсом и клиентскими данными занимаются одной, система расчетов и контроль платежей - другие, Тарифы, лимиты, депозиты - еще кто-то. Есть задачи на стыке - вот я (в основном по комплаенсу работаю) делал комплекс проверок (в основном связанных с комплаенсом, но не только) для системы расчетов. Они задали контракт для вызова, я по этому контракту дела диспетчер + набор проверок + инструменты для настроек все этого. При этом как у них там обрабатывается платеж я совершенно не в курсе - это их кухня, не моя. Знаю, что они в нужном месте встроили вызов моего диспетчера в свой процесс.

                Есть "единый сервис проверок". Там много чего проверяется. Там тоже делал что-то по линии комплаенса. И вызываться оно может в итоге из 100500 мест, мне не ведомых. Есть контракт, есть бизнес-требования - по ним работаем.

                Так что никаких подробностей вычислительных задач не будет. Их слишком много. Реально много и они могут быть реально большими - я работал с ТЗ по 200-300 страниц и делал системы где одних таблиц (не считая индексов) 20+ штук, плюс еще несколько десятков программных объектов самого разного функционала.

                А бывают задачки где все сводится к тому, что нужно сделать одну таблицу с парой индексов и внести исправления в пару уже существующих модулей (например, сделать витрину текущих значений даты актуализации + ее обновление + ретривер по ней потому что сама таблицы ДА - историческая, а ДА запрашивается из 100500 других модулей и поддержка витрины обходится дешевле чем каждый раз искать по исторической таблице значение с последней датой изменения - когда это дергается 25млн раз подряд только в рамках одной задачи это слишком накладно).

                Серьезно разговаривать про все вот это можно с тем, кто знает и понимает специфику и варился в этой кухне. А с предложениями типа "перевести все на микросервисы и переписать на современный технологический стек" - это не ко мне.


  1. SlFed
    08.12.2023 05:38
    +1

    Однако при отсортированном порядке инвестиций мы видим либо, практически, неизменность либо увеличение скорости вычисления.

    Если я правильно понял, вы пришли к использованию распределения задач между ядрами по "жадному алгоритму". Сначала распределяете на разные ядра самые тяжелые потоки, а потом освободившиеся загружаете более легкими и так пока пул задач не опустеет.

    А как решается проблема, если новая задача появляется пока текущий пул еще не закончился ? Создается новый список задач ? Или пересортировывается существующий ?


    1. SpiderEkb
      08.12.2023 05:38

      Я не особо вникал в код (просто java не обучен), но как можно предсказать загруженность потока для алгоритма

      Сначала распределяете на разные ядра самые тяжелые потоки, а потом освободившиеся загружаете более легкими и так пока пул задач не опустеет.

      Как упоминалось выше, "берем инвестора и отправляем его на обработку". Но инвесторов там (условно) несколько тысяч. У одних данных мало, у других много. Т.е. на старте мы что-то можем посмотреть (первые 10 (например) инвесторов), а дальше? Вот у нас 11-й инвестор. У него много данных - он "тяжелый". А первым освободился поток, который обрабатывал "легкого" инвестора - он был "распределен по остаточному принципу". А теперь в него пришел "тяжелый" инвестор. И вся эта "жадная логика" пошла наперекосяк.

      Или предполагается постоянно стартовать новый поток под каждого нового инвестора (и останавливать его после его обработки)? Извините, но это такое себе решение... Запуск потока не бесплатен, с ним связаны некоторые накладные расходы. Зачем их приумножать без нужды? Запустили один раз 10 потоков (все они равноправны, без жадной логики т.к. заранее мы не знаем куда что попадет) и дальше просто подкидываем данные в первый освободившийся. И тут опять решение с конвейером, что я описывал выше) - нам не надо думать кто там освободился, а кто нет - мы просто выкладываем на конвейер данные. А подхватит их уже первый кто свободен (обработал предыдущий блок). Нам только обеспечить раздачу и следить чтобы конвейер не переполнялся (возможно, в каких-то ситуациях притормозить раздачу, добавить поток обработки - что-то типа того).


    1. Borlok Автор
      08.12.2023 05:38

      Здравствуйте! Жадный алгоритм присутствует внутри пула ForkJoinPool, если мы говорим про алгоритм кражи работы. Поправьте, пожалуйста, если это не так. Это его собственная оптимизация в дополнении к той, что пишем мы. 

      Сначала распределяете на разные ядра самые тяжелые потоки, а потом освободившиеся загружаете более легкими и так пока пул задач не опустеет.

      Напротив, я не распределяю самые тяжелые задачи на ядра. Статья о том, что бы показать преимущества равного распределения. И здесь распределяются задачи по типу 1-10, 2-9, 3-8 и тд. Каждый поток в пуле, имеет свою очередь задач, и по приходу новых даных, они добавляются в очередь задач, которая есть у каждого потока в пуле.

      P.S. Тем не менее кое-где в коде есть место, где я написал абсолютно неверное распределение) Кому будет интересно, могут сами придумать правильное распределение)

      В классе AsyncTwoPointerTask методе divide(), мы делаем подобие пула

      if (asPool) { 
        MyPhaser.register(); 
        new AsyncTwoPointerTask(investors.subList(leftStart, leftEnd), threshold, parts, asPool).fork(); 
        new AsyncTwoPointerTask(investors.subList(rightStart, rightEnd), threshold, parts, asPool).fork(); 
      }

      И здесь мы как раз забиваем один поток самыми легкими задачами, второй самыми тяжелыми. А смысл статьи - так не делать)


      1. SpiderEkb
        08.12.2023 05:38

        А не проще сделать один пул (очередь) задач, общую для всех потоков? По аналогии с нашим конвейером. Тогда вам не придется думать о загрузке - она распределится равномерно. Какой-то поток выполнит меньше, но более тяжелых задач, какой-то больше, но более легких.

        Мы когда свою схему отрабатывали, собирали много статистики - суммарные времена работу для каждого обработчика, количество обработанных данных и т.п.

        К сожалению, сейчас нет под рукой статистики по большим объемам данных, но примерно так:

        Количество обработанных данных и общее время работы для каждого обработчика.

        Чем больше данных, тем меньше будет разброс по времени работы. Но даже тут видно, что разница, например, между первым (A0EMR1) и вторым (A0EMR2) обработчиками по данным в два раза, а по времени всего в 2мс.


        1. Borlok Автор
          08.12.2023 05:38

          Пул и так один. Смысл в том, что бы распределить задачи внутри одного пула и оптимизировать его работу. Просто мы с вами говорим о разном уровне абстракции и разных языках программирования. Вы говорите про конвейер, терминаторы, которые в текущем контексте были бы реализованы выше по иерархии. Получается, что по аллегории, в статье мы разбираем буквы из алфавита, а вы - слова)

          Так же специфичная для Java и Fork Join pool, в данном контексте, вещь - это как работает этот фреймворк. Мы создаем пул, у пула есть потоки, которые он распределяет на разные ядра, и при делении задач на количество большее, чем доступно ядер, мы не создаем новые потоки, а помещаем задачи в очередь на исполнение. Это не относится к многопоточному программированию, потому здесь нет Thread's в явном исполнении и блокировок. В данном случае это просто изолированный юнит работы, который мы оптимизируем и разбираем преимущества распределения задач.


          1. SpiderEkb
            08.12.2023 05:38

            Ок. Спасибо. Примерно (ну очень примерно :-) понятно. Вы делаете сами то, что мы просто отдаем на окуп операционке.

            Дело не в разных языках. То, что реализовано у нас, можно и на java написать без проблем. И даже не в платформе - я под винду тоже самое сделаю точно также. Ну не будет у меня "заданий", будут процессы. Не будет у меня *USRQ - будут пайпы (там еще есть mailslot, но для данной задачи он неудобен). Или локальные сокеты (например, UDP сокет на каком-то порту localhost - на винде нет именованных UNIX-sockets - вполне себе подойдет под данную задачу).

            Но я давно уже исхожу из того, что мой процесс не является единственным работающим на данной машине. И раскидывать потоки по ядрам, опираясь только на потребности своего процесса чревато тем, что этим же занимается и сама система, но в более глобальном масштабе. Т.е. мы запросто можем навесить какой-то свой загруженный поток на то ядро, которое система решили отдать под какой-то совершенно неизвестный нам процесс с достаточно высокой степенью утилизации этого самого ядра. И в результате получим лютую фигню.

            Из вышесказанного - я все-таки предпочту отдать задачу распределения потоков по ядрам системе - у нее гораздо больше информации для того, чтобы сделать это оптимально для всей машины целиком и не попасть в ситуацию когда моя задача не только тормозит сама, но и мешает нормально работать другим.

            А вообще все поверяется практикой. Нужно просто реализовать разными способами одну и ту же реальную задачу и посмотреть что получится в том или ином случае. По трудоемкости, скорости работы, эффективности использования ресурсов машины.