В данной статье приводится пример использования Akka в Spring-приложении, что представляет некоторую сложность, поскольку в силу ее особенностей, акторов нельзя создавать посредством простого вызова new.
Создадим вначале некую модельную проблему
Сгенерируем обычное spring-boot приложение:
@SpringBootApplication
public class AkkaProjectApplication {
public static void main(String[] args) {
SpringApplication.run(AkkaProjectApplication.class, args);
}
}
С тестом на то, что контекст стартует без проблем:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AkkaProjectApplicationTests {
@Test
public void contextLoads() {
}
}
Предположим, что у нас есть некий внешний сервис ExternalService с довольно-таки небыстрой операцией:
@Service
public class ExternalServiceFakeImpl implements ExternalService {
@Value("${delay.base:300}")
long delayBase;
@Value("${delay.spread:300}")
long delaySpread;
@Override
public ServiceResponse timeConsumingOperation(ServiceRequest request) {
try {
sleep(delayBase + (int) (delaySpread * random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
return buildServiceResponse(SUCCESS);
}
private ServiceResponse buildServiceResponse(StatusCodeType statusCode) {
ServiceResponse result = new ServiceResponse();
result.setStatusCode(statusCode);
return result;
}
}
И нам требуется доставать данные из некоторого хранилища и отправлять на обработку в этот небыстрый сервис. Данные предоставляет DataProvider:
@Component
public class DataProvider implements Dao {
@Override
public List<DataItem> retrieveItems(int maxSize) {
List<DataItem> result = new ArrayList<>();
for (int i = 0; i < maxSize * random(); i++) {
result.add(buildDataItem());
}
return result;
}
private DataItem buildDataItem() {
DataItem dataItem = new DataItem();
dataItem.setTime(LocalDateTime.now());
dataItem.setValue(random());
return dataItem;
}
}
Где класс DataItem используется как контейнер данных:
public class DataItem {
private LocalDateTime time;
private Double value;
public LocalDateTime getTime() {
return time;
}
public void setTime(LocalDateTime time) {
this.time = time;
}
public Double getValue() {
return value;
}
public void setValue(Double value) {
this.value = value;
}
}
При условии, что инстансов ExternalService у нас много/несколько, и они стоят за балансером, есть смысл слать запросы в него в несколько потоков. Собственно для этого и используем Akka
Подготовка инфраструктуры для использования Akka в Spring-приложении
Добавляем нужные зависимости:
<properties>
<akka.version>2.5.4</akka.version>
</properties>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>${akka.version}</version>
</dependency>
Для интеграции фреймворка со Spring, необходимо использовать свой класс SpringActorProducer, имплементирующий IndirectActorProducer:
public class SpringActorProducer implements IndirectActorProducer {
private final ApplicationContext applicationContext;
private final String actorBeanName;
public SpringActorProducer(ApplicationContext applicationContext,
String actorBeanName) {
this.applicationContext = applicationContext;
this.actorBeanName = actorBeanName;
}
@Override
public Actor produce() {
return (Actor) applicationContext.getBean(actorBeanName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) applicationContext.getType(actorBeanName);
}
}
а также SpringExtension, имплементирующий Extension:
@Component
public class SpringExtension implements Extension {
private ApplicationContext applicationContext;
public void initialize(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public Props props(String actorBeanName) {
return Props.create(SpringActorProducer.class, applicationContext,
actorBeanName);
}
}
Для создания Actor system, создадим отдельный конфиг AkkaConfig:
@Configuration
@Lazy
public class AkkaConfig {
@Autowired
ApplicationContext applicationContext;
@Autowired
SpringExtension springExtension;
@Bean
public ActorSystem actorSystem() {
ActorSystem system = ActorSystem.create("KMT", akkaConfiguration());
// Initialize the application context in the Akka Spring Extension
springExtension.initialize(applicationContext);
return system;
}
/**
* Read configuration from application.conf file
*/
@Bean
public Config akkaConfiguration() {
return ConfigFactory.load();
}
}
# Loggers to register at boot time (akka.event.Logging$DefaultLogger logs to STDOUT)
loggers = [«akka.event.slf4j.Slf4jLogger»]
# Log level used by the configured loggers (see «loggers») as soon
# as they have been started; before that, see «stdout-loglevel»
# Options: OFF, ERROR, WARNING, INFO, DEBUG
loglevel = «INFO»
# Log level for the very basic logger activated during ActorSystem startup.
# This logger prints the log messages to stdout (System.out).
# Options: OFF, ERROR, WARNING, INFO, DEBUG
stdout-loglevel = «INFO»
}
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{ISO8601} %-5level %X{akkaSource} - %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
{akkaSource} позволяет видеть в логах, какой именно поток делает конкретное действие
Теперь акторы в сприн-бинах можем создавать при помощи props метода у SpringExtension. Это и сделаем в методе, аннотированном @PostConstruct в классе Scheduler, который достает записи при помощи DataProvider и шлет акторам:
@Component
public class Scheduler {
private static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);
private final long DELAY = 10_000;
@Value("${scheduler.batchSize:5}")
int batchSize;
@Autowired
ApplicationContext context;
@Autowired
Dao dataProvider;
ActorRef migrationActor;
@Scheduled(fixedDelay = DELAY)
public void performRegularAction() {
LOGGER.info("---------------------------------------------------");
LOGGER.info("Try to retrieve {} records from DB...", batchSize);
List<DataItem> dataItems = dataProvider.retrieveItems(batchSize);
LOGGER.info("{} records retrieved", dataItems.size());
int itemNumber = 0;
for (DataItem dataItem : dataItems) {
LOGGER.info("Send to actor item №{}...", itemNumber++);
migrationActor.tell(new MigrationActor.Send(dataItem),
ActorRef.noSender());
}
}
@PostConstruct
public void postConstructMethod() {
ActorSystem system = context.getBean(ActorSystem.class);
SpringExtension springExtension = context.getBean(SpringExtension.class);
// Use the Spring Extension to create props for a named actor bean
migrationActor = system.actorOf(
springExtension.props("migrationActor")
.withRouter(new RoundRobinPool(4)));
}
}
Метод postConstructMethod создает reference на migrationActor, сообщения к которому будут попадать через роутер RoundRobinPool длиной 4, в итоге одновременно будут работать 4 инстанса MigrationActor, каждый в своем потоке. В итоге сообщения в коде шлем одному и тому же объекту — а роутер раскидывает их по разным инстансам
Пишем тест для Scheduler-а (извините, на JMock):
public class SchedulerTest {
@org.junit.Rule
public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
}};
Scheduler scheduler;
static ActorSystem system;
static TestProbe probe;
@BeforeClass
public static void setup() {
system = ActorSystem.create("TestSystem");
probe = TestProbe.apply(system);
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@SuppressWarnings("unchecked")
@Before
public void setUp() {
scheduler = new Scheduler();
scheduler.dataProvider = mockery.mock(Dao.class);
scheduler.migrationActor = probe.ref();
scheduler.batchSize = 5;
}
@Test
public void performRegularAction() throws Exception {
DataItem dataItem = new DataItem();
List<DataItem> dataItems = Arrays.asList(dataItem);
mockery.checking(new Expectations() {
{
oneOf(scheduler.dataProvider).retrieveItems(scheduler.batchSize);
will(returnValue(dataItems));
}
});
scheduler.performRegularAction();
MigrationActor.Send send = probe.expectMsgClass(MigrationActor.Send.class);
assertThat("Wrong message in Send", send.dataItem, is(dataItem));
}
}
Наконец описываем сам актор:
@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class MigrationActor extends AbstractActor {
private final LoggingAdapter LOGGER =
Logging.getLogger(getContext().getSystem(), this);
@Autowired
@Qualifier("externalServiceFakeImpl")
ExternalService externalService;
public MigrationActor() {
}
public MigrationActor(ExternalService externalService) {
this.externalService = externalService;
}
public static Props props(ExternalService externalService) {
return Props.create(MigrationActor.class, externalService);
}
@Override
public void preStart() throws Exception {
super.preStart();
LOGGER.info("Migration actor started");
}
@Override
public void postStop() throws Exception {
LOGGER.info("Migration actor stopped");
super.postStop();
}
public static class Send {
public final DataItem dataItem;
public Send(DataItem dataItem) {
this.dataItem = dataItem;
}
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Send.class, this::onSend)
.build();
}
private void onSend(Send send) {
LOGGER.info("Start call service by onSend...");
ServiceRequest request = new ServiceRequest();
request.setDataItem(send.dataItem);
externalService.timeConsumingOperation(request);
LOGGER.info("Finish call to service by onSend");
}
}
На что здесть стоит обратить внимание:
— актор имеет scope=prototype, поскольку стандартный scope=singleton не подходит для стратегии «актор-на-поток»
— как создается логгер
— классы месседжей, передаваемые актору, определяются прямо внутри него, т.к. по сути это — часть его контракта. Вообще — месседжи, как используемые для обмена между потоками, должны быть immutable
— определяем статический метод props, чтобы создавать Акторы, используя Props.create(). Это понадобится для теста на актор
— как следствие — нам приходится определить конструктор с сервисом в виде параметра
— т.к. акторы создаются как спринг-бины, а конструктор с параметрами уже есть — также объявляем и пустой конструктор
— наличие методов preStart и postStop, где можно отследить моменты старта/запуска актора
— метод createReceive, где собственно и происходит матчинг получаемых сообщений на методы, которые вызываются как реакция на сообщения
Пишем тест для актора:
public class MigrationActorTest {
@org.junit.Rule
public final JUnitRuleMockery mockery = new JUnitRuleMockery() {{
setImposteriser(ClassImposteriser.INSTANCE);
setThreadingPolicy(new Synchroniser());
}};
static ActorSystem system;
@BeforeClass
public static void setup() {
system = ActorSystem.create("TestSystem");
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
}
@Test
public void testMigrationActorSend() {
ExternalService externalService = mockery.mock(ExternalService.class);
DataItem dataItem = new DataItem();
mockery.checking(new Expectations() {
{
oneOf(externalService).timeConsumingOperation(with(allOf(
any(ServiceRequest.class),
hasProperty("dataItem", is(dataItem))
)));
}
});
TestActorRef<MigrationActor> ref = TestActorRef.create(system,
MigrationActor.props(externalService), "migrationActor");
ref.tell(new MigrationActor.Send(dataItem), ActorRef.noSender());
}
}
Тут понадобилось для JUnitRuleMockery указать ThreadingPolicy, т.к. иначе mockery была бы non-ThreadSafe.
Вот вкратце и все. Стоит только заметить, что в случае необходимости также можно передавать результаты работы одного актора — другому или самому отправителю, если он — тоже актор. Для тестирования таких акторов стоит использовать класс TestKit
Исходный код приведен здесь
Комментарии (5)
yahorfilipcyk
11.09.2017 03:05Спасибо за статью. Хотел только добавить/поправить насчет параллелизма и настроек.
Строго говоря, "актор-на-поток" не является характеристикой Akka. Не уверен даже, можно ли это гарантировать. Akka лишь гарантирует синхронность обработки сообщений внутри актора, т.е. если клиент оправил конкретному актору N сообщений, порядок их получения и обработки актором гарантирован. Параллелизм же относительно других акторов и системы в целом определяется используемым "диспетчером" (фактически — пул потоков). Диспетчер присваивается каждому актору, и, если диспетчер не настроен для конкретного актора, используется дефолтный, который имеет под капотом
ForkJoinPool
, размер которого зависит, в том числе, от количества процессоров на машине. Короче говоря, может получиться так, что у вас меньше потоков в пуле потоков, чем количество акторов в пуле акторов. Особенно имеет смысл настроить отдельный диспетчер для акторов, которые выполняют долгие блокирующие операции, как в вашем примере, чтобы не загружать всю систему.
Akka очень хорошо настраиваема. В том числе, можно переместить конфигурацию конкретного роутера в файл конфигурации, что дает больше гибкости в настройке количества акторов в пуле, типа роутера, диспетчера и т.д.
AstarothAst
11.09.2017 11:55Каждый раз, когда вижу Акку приходит мысль, что это «эрланг для тех, кто не хочет писать на эрланг». Не знаю насколько это верно в действительности…
yahorfilipcyk
11.09.2017 20:12Ну это ж на JVM всё-таки. Akka может быть лишь частью сложной системы. Да и сама Akka достаточно большая. Например, поддержка кластера позволяет строить сложные распределённые системы. Акторы лишь часть всего этого, хоть и фундаментальная.
time2rfc
На java, это конечно все кажеться несколько более монструозно чем на scala.
Спасибо за статью, но боюсь что многие читатели не до конца поняли пользу акторного подхода на этом примере.
andd3dfx Автор
Спасибо за спасибо
Да, видимо насчет нюансов самой Akka должна быть написана другая статья. Т.к. в этой в основном концентрировались на ее интеграции со Spring