Фреймворк Akka позволяет реализовать многопоточность в Java-приложении, используя концепцию акторов, взаимодействующих посредством посылки друг другу сообщений. Создав несколько копий акторов одного и того же типа, мы можем таким образом распределить нагрузку в приложении между несколькими потоками.

В данной статье приводится пример использования Akka в Spring-приложении, что представляет некоторую сложность, поскольку в силу ее особенностей, акторов нельзя создавать посредством простого вызова new.

Создадим вначале некую модельную проблему

Сгенерируем обычное spring-boot приложение:

AkkaProjectApplication
@SpringBootApplication
public class AkkaProjectApplication {
    public static void main(String[] args) {
        SpringApplication.run(AkkaProjectApplication.class, args);
    }
}


С тестом на то, что контекст стартует без проблем:

AkkaProjectApplicationTests
@RunWith(SpringRunner.class)
@SpringBootTest
public class AkkaProjectApplicationTests {
    @Test
    public void contextLoads() {
    }
}


Предположим, что у нас есть некий внешний сервис ExternalService с довольно-таки небыстрой операцией:

ExternalServiceFakeImpl
@Service
public class ExternalServiceFakeImpl implements ExternalService {
    @Value("${delay.base:300}")
    long delayBase;

    @Value("${delay.spread:300}")
    long delaySpread;

    @Override
    public ServiceResponse timeConsumingOperation(ServiceRequest request) {
        try {
            sleep(delayBase + (int) (delaySpread * random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return buildServiceResponse(SUCCESS);
    }

    private ServiceResponse buildServiceResponse(StatusCodeType statusCode) {
        ServiceResponse result = new ServiceResponse();
        result.setStatusCode(statusCode);
        return result;
    }
}


И нам требуется доставать данные из некоторого хранилища и отправлять на обработку в этот небыстрый сервис. Данные предоставляет DataProvider:

DataProvider
@Component
public class DataProvider implements Dao {
    @Override
    public List<DataItem> retrieveItems(int maxSize) {
        List<DataItem> result = new ArrayList<>();
        for (int i = 0; i < maxSize * random(); i++) {
            result.add(buildDataItem());
        }
        return result;
    }

    private DataItem buildDataItem() {
        DataItem dataItem = new DataItem();
        dataItem.setTime(LocalDateTime.now());
        dataItem.setValue(random());
        return dataItem;
    }
}


Где класс DataItem используется как контейнер данных:

DataItem
public class DataItem {
    private LocalDateTime time;
    private Double value;

    public LocalDateTime getTime() {
        return time;
    }

    public void setTime(LocalDateTime time) {
        this.time = time;
    }

    public Double getValue() {
        return value;
    }

    public void setValue(Double value) {
        this.value = value;
    }
}


При условии, что инстансов ExternalService у нас много/несколько, и они стоят за балансером, есть смысл слать запросы в него в несколько потоков. Собственно для этого и используем Akka

Подготовка инфраструктуры для использования Akka в Spring-приложении

Добавляем нужные зависимости:

Akka зависимости
<properties>
  <akka.version>2.5.4</akka.version>
</properties>

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-testkit_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>
<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-slf4j_2.12</artifactId>
  <version>${akka.version}</version>
</dependency>


Для интеграции фреймворка со Spring, необходимо использовать свой класс SpringActorProducer, имплементирующий IndirectActorProducer:

SpringActorProducer
public class SpringActorProducer implements IndirectActorProducer {
    private final ApplicationContext applicationContext;
    private final String actorBeanName;

    public SpringActorProducer(ApplicationContext applicationContext, 
    String actorBeanName) {
        this.applicationContext = applicationContext;
        this.actorBeanName = actorBeanName;
    }

    @Override
    public Actor produce() {
        return (Actor) applicationContext.getBean(actorBeanName);
    }

    @Override
    public Class<? extends Actor> actorClass() {
        return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
    }
}


а также SpringExtension, имплементирующий Extension:

SpringExtension
@Component
public class SpringExtension implements Extension {
    private ApplicationContext applicationContext;

    public void initialize(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }

    public Props props(String actorBeanName) {
        return Props.create(SpringActorProducer.class, applicationContext, 
        actorBeanName);
    }
}


Для создания Actor system, создадим отдельный конфиг AkkaConfig:

AkkaConfig
@Configuration
@Lazy
public class AkkaConfig {
    @Autowired
    ApplicationContext applicationContext;

    @Autowired
    SpringExtension springExtension;

    @Bean
    public ActorSystem actorSystem() {
        ActorSystem system = ActorSystem.create("KMT", akkaConfiguration());

        // Initialize the application context in the Akka Spring Extension
        springExtension.initialize(applicationContext);
        return system;
    }

    /**
     * Read configuration from application.conf file
     */
    @Bean
    public Config akkaConfiguration() {
        return ConfigFactory.load();
    }
}

С конфигом
akka {
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs to STDOUT)
loggers = [«akka.event.slf4j.Slf4jLogger»]

# Log level used by the configured loggers (see «loggers») as soon
# as they have been started; before that, see «stdout-loglevel»
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = «INFO»

# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = «INFO»
}

И логгинг конфигурацией
<configuration>
  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
    <encoder>
      <pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
    </encoder>
  </appender>

  <root level="INFO">
    <appender-ref ref="STDOUT"/>
  </root>
</configuration>

{akkaSource} позволяет видеть в логах, какой именно поток делает конкретное действие

Теперь акторы в сприн-бинах можем создавать при помощи props метода у SpringExtension. Это и сделаем в методе, аннотированном @PostConstruct в классе Scheduler, который достает записи при помощи DataProvider и шлет акторам:

Scheduler
@Component
public class Scheduler {
    private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
    private final long DELAY = 10_000;

    @Value("${scheduler.batchSize:5}")
    int batchSize;

    @Autowired
    ApplicationContext context;

    @Autowired
    Dao dataProvider;

    ActorRef migrationActor;

    @Scheduled(fixedDelay = DELAY)
    public void performRegularAction() {
        LOGGER.info("---------------------------------------------------");
        LOGGER.info("Try to retrieve {} records from DB...", batchSize);

        List<DataItem> dataItems = dataProvider.retrieveItems(batchSize);
        LOGGER.info("{} records retrieved", dataItems.size());

        int itemNumber = 0;
        for (DataItem dataItem : dataItems) {
            LOGGER.info("Send to actor item №{}...", itemNumber++);

            migrationActor.tell(new MigrationActor.Send(dataItem),
            ActorRef.noSender());
        }
    }

    @PostConstruct
    public void postConstructMethod() {
        ActorSystem system = context.getBean(ActorSystem.class);
        SpringExtension springExtension = context.getBean(SpringExtension.class);

        // Use the Spring Extension to create props for a named actor bean
        migrationActor = system.actorOf(
                springExtension.props("migrationActor")
                .withRouter(new RoundRobinPool(4)));
    }
}


Метод postConstructMethod создает reference на migrationActor, сообщения к которому будут попадать через роутер RoundRobinPool длиной 4, в итоге одновременно будут работать 4 инстанса MigrationActor, каждый в своем потоке. В итоге сообщения в коде шлем одному и тому же объекту — а роутер раскидывает их по разным инстансам

Пишем тест для Scheduler-а (извините, на JMock):

SchedulerTest
public class SchedulerTest {
    @org.junit.Rule
    public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
        setImposteriser(ClassImposteriser.INSTANCE);
    }};

    Scheduler scheduler;

    static ActorSystem system;
    static TestProbe probe;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestSystem");
        probe = TestProbe.apply(system);
    }

    @AfterClass
    public static void tearDown() {
        TestKit.shutdownActorSystem(system);
        system = null;
    }

    @SuppressWarnings("unchecked")
    @Before
    public void setUp() {
        scheduler = new Scheduler();

        scheduler.dataProvider = mockery.mock(Dao.class);
        scheduler.migrationActor = probe.ref();
        scheduler.batchSize = 5;
    }

    @Test
    public void performRegularAction() throws Exception {
        DataItem dataItem = new DataItem();
        List<DataItem> dataItems = Arrays.asList(dataItem);

        mockery.checking(new Expectations() {
            {
                oneOf(scheduler.dataProvider).retrieveItems(scheduler.batchSize);
                will(returnValue(dataItems));
            }
        });

        scheduler.performRegularAction();

        MigrationActor.Send send = probe.expectMsgClass(MigrationActor.Send.class);
        assertThat("Wrong message in Send", send.dataItem, is(dataItem));
    }
}


Наконец описываем сам актор:

MigrationActor
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MigrationActor extends AbstractActor {
    private final LoggingAdapter LOGGER = 
    Logging.getLogger(getContext().getSystem(), this);

    @Autowired
    @Qualifier("externalServiceFakeImpl")
    ExternalService externalService;

    public MigrationActor() {
    }

    public MigrationActor(ExternalService externalService) {
        this.externalService = externalService;
    }

    public static Props props(ExternalService externalService) {
        return Props.create(MigrationActor.class, externalService);
    }

    @Override
    public void preStart() throws Exception {
        super.preStart();
        LOGGER.info("Migration actor started");
    }

    @Override
    public void postStop() throws Exception {
        LOGGER.info("Migration actor stopped");
        super.postStop();
    }

    public static class Send {
        public final DataItem dataItem;

        public Send(DataItem dataItem) {
            this.dataItem = dataItem;
        }
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Send.class, this::onSend)
                .build();
    }

    private void onSend(Send send) {
        LOGGER.info("Start call service by onSend...");

        ServiceRequest request = new ServiceRequest();
        request.setDataItem(send.dataItem);
        externalService.timeConsumingOperation(request);

        LOGGER.info("Finish call to service by onSend");
    }
}


На что здесть стоит обратить внимание:

— актор имеет scope=prototype, поскольку стандартный scope=singleton не подходит для стратегии «актор-на-поток»
— как создается логгер
— классы месседжей, передаваемые актору, определяются прямо внутри него, т.к. по сути это — часть его контракта. Вообще — месседжи, как используемые для обмена между потоками, должны быть immutable
— определяем статический метод props, чтобы создавать Акторы, используя Props.create(). Это понадобится для теста на актор
— как следствие — нам приходится определить конструктор с сервисом в виде параметра
— т.к. акторы создаются как спринг-бины, а конструктор с параметрами уже есть — также объявляем и пустой конструктор
— наличие методов preStart и postStop, где можно отследить моменты старта/запуска актора
— метод createReceive, где собственно и происходит матчинг получаемых сообщений на методы, которые вызываются как реакция на сообщения

Пишем тест для актора:

MigrationActorTest
public class MigrationActorTest {
    @org.junit.Rule
    public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
        setImposteriser(ClassImposteriser.INSTANCE);
        setThreadingPolicy(new Synchroniser());
    }};

    static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestSystem");
    }

    @AfterClass
    public static void tearDown() {
        TestKit.shutdownActorSystem(system);
        system = null;
    }

    @Test
    public void testMigrationActorSend() {
        ExternalService externalService = mockery.mock(ExternalService.class);
        DataItem dataItem = new DataItem();

        mockery.checking(new Expectations() {
            {
                oneOf(externalService).timeConsumingOperation(with(allOf(
                        any(ServiceRequest.class),
                        hasProperty("dataItem", is(dataItem))
                )));
            }
        });

        TestActorRef<MigrationActor> ref = TestActorRef.create(system, 
        MigrationActor.props(externalService), "migrationActor");

        ref.tell(new MigrationActor.Send(dataItem), ActorRef.noSender());
    }
}


Тут понадобилось для JUnitRuleMockery указать ThreadingPolicy, т.к. иначе mockery была бы non-ThreadSafe.

Вот вкратце и все. Стоит только заметить, что в случае необходимости также можно передавать результаты работы одного актора — другому или самому отправителю, если он — тоже актор. Для тестирования таких акторов стоит использовать класс TestKit

Исходный код приведен здесь

Комментарии (5)


  1. time2rfc
    10.09.2017 23:21
    +2

    На java, это конечно все кажеться несколько более монструозно чем на scala.
    Спасибо за статью, но боюсь что многие читатели не до конца поняли пользу акторного подхода на этом примере.


    1. andd3dfx Автор
      11.09.2017 00:25

      Спасибо за спасибо
      Да, видимо насчет нюансов самой Akka должна быть написана другая статья. Т.к. в этой в основном концентрировались на ее интеграции со Spring


  1. yahorfilipcyk
    11.09.2017 03:05

    Спасибо за статью. Хотел только добавить/поправить насчет параллелизма и настроек.


    Строго говоря, "актор-на-поток" не является характеристикой Akka. Не уверен даже, можно ли это гарантировать. Akka лишь гарантирует синхронность обработки сообщений внутри актора, т.е. если клиент оправил конкретному актору N сообщений, порядок их получения и обработки актором гарантирован. Параллелизм же относительно других акторов и системы в целом определяется используемым "диспетчером" (фактически — пул потоков). Диспетчер присваивается каждому актору, и, если диспетчер не настроен для конкретного актора, используется дефолтный, который имеет под капотом ForkJoinPool, размер которого зависит, в том числе, от количества процессоров на машине. Короче говоря, может получиться так, что у вас меньше потоков в пуле потоков, чем количество акторов в пуле акторов. Особенно имеет смысл настроить отдельный диспетчер для акторов, которые выполняют долгие блокирующие операции, как в вашем примере, чтобы не загружать всю систему.


    Akka очень хорошо настраиваема. В том числе, можно переместить конфигурацию конкретного роутера в файл конфигурации, что дает больше гибкости в настройке количества акторов в пуле, типа роутера, диспетчера и т.д.


  1. AstarothAst
    11.09.2017 11:55

    Каждый раз, когда вижу Акку приходит мысль, что это «эрланг для тех, кто не хочет писать на эрланг». Не знаю насколько это верно в действительности…


    1. yahorfilipcyk
      11.09.2017 20:12

      Ну это ж на JVM всё-таки. Akka может быть лишь частью сложной системы. Да и сама Akka достаточно большая. Например, поддержка кластера позволяет строить сложные распределённые системы. Акторы лишь часть всего этого, хоть и фундаментальная.