Share Button

AkkaAt Skiddoo, we use the Play Framework for our REST API. Before building it, no one in the team had worked with this framework previously. But the reactive and stateless orientation of the framework made us believe it was the best choice if we wanted to build a lightweight and scalable application. Because we all come from a Java background, we choose naturally the Java version of the framework.

I could say a lots of good things about Play (JSON as first class citizen, asynchronous I/O..) and some annoying things too, but today is about Akka. So why am I talking about Play ? Because Play has been built on top of Akka (and other technologies). That makes Akka a potential first class citizen for every application using Play.

Until recently, we were only using Akka for its scheduler:

Akka.system().scheduler()

It was a solution for us to schedule some background tasks to be run repeatedly at a particular frequency (some of those tasks should have probably been a different independent service, but that’s another story).

And recently, we decided our API was doing too many things, it was time to refactor some services and pull them out of the Play application. As a start, we pulled out our tracking system (track all the flight searches made through our API to provide statistics to other teams, marketing for example).

I will assume you’ve heard a little bit about Akka and you have a global idea what it is about. My main goal was a service I could start/stop without impacting the other services communicating with it, and without losing  any information.

I knew it was possible through Akka, because every software engineer who has heard about Akka, has probably read the official description about it:

Akka is a toolkit and runtime for building highly concurrent, distributed, and resilient message-driven applications on the JVM.

But at this point, I didn’t know how it was possible through Akka.

The idea was very simple: every time a search request comes into our API, a message containing the informations about the search will be sent to this new service. In the world of Akka, it’s just about sending a message to an Actor, and this Actor would be responsible to process/store it somewhere. We called this new service the skiddoo-aggregator.

Simple concept but we had multiple constraints:

  1. skiddoo-aggregator should be able to run on any server, anywhere and location should be transparent
  2. If skiddoo-aggregator is down, messages should not be lost, and the API should retry  to send them at a particular frequency
  3. If the JVM crashes, messages which have not been sent (by the API) or processed (by skiddoo-aggregator) should not be lost and system should be able to recover on restart

So what is a message ? It’s a simple immutable POJO:

package au.com.skiddoo.aggregator.message;

import java.time.LocalDate;

public class SearchMessage {
  private final LocalDate date;
  private final String from;
  private final String to;

  public SearchMessage(LocalDate date, String from, String to) {
    this.date = date;
    this.from = from;
    this.to = to;
  }

  public LocalDate getDate() {
    return date;
  }

  public String getFrom() {
    return from;
  }

  public String getTo() {
    return to;
  }
}

A very rough diagram of the architecture:

Actor

We have an Actor on the API side and every time a search comes in, we’ll send information about this search to the local actor ClientAggregatorActor. Then this actor will forward the information to skidoo-aggregator and more precisely to the SupervisorAggregatorActor, which is our “main” actor in skiddoo-aggregator service. I’ll talk about this guy later.

You might wonder, why do we need an Actor on the API side ? Can’t we just send immediately the search information to skiddoo-aggregator ? It’s true, we could. A message to an actor doesn’t have to be sent from another actor. In this case we would use ActorRef.noSender() as the sender of the message. But, we need to send the message from an Actor to achieve the constraint of resiliency and self-healing of the system. I’ll come back to this in the section where I talk about about recovering and how to not lose your data.

Now that we have the foundation, I had to discover how I could achieve the 3 constraints (transparent location, process acknowledgement and recovery).

Transparent location

In Akka, when an actor sends a message to another actor, it is not aware of any location, it all goes through an object ActorRef which represents a reference to a local Actor (on the same JVM) or a remote Actor (different JVM on another server or not..). So location transparency comes for free basically. What doesn’t come for free is communication between different JVM.

Remote communication comes with an additional module of Akka: akka-remote.

“com.typesafe.akka” %% “akka-remote” % “2.3.11”

For the Actors themselves, nothing really changes. It doesn’t matter if you build a local or a remote Actor, they are built the same way. What does change is the format of your message and the configuration of your Akka application.

Messages have to go through the network, so there is obviously a serialization/deserialization mechanism in between. By default, Akka use the Java serialization system.  You could write your own system and ask Akka to use it, but that’s not the point here and we’re pretty happy with the default system so far.

But, that means the message has to implement Serializable and provide a serialVersionUID.  And it has a cost, from Effective Java (Joshua Bloch) :

A major cost of implementing Serializable is that it decreases the flexibility to change a class’s implementation once it has been released.

So walk down the path of serialization but be aware of the constraints it puts on your objects. A more flexible way could be an implementation of a custom serializer based on JSON format (with Gson for example).

Regarding the configuration side, it’s pretty straightforward. If you want your actors to send/receive remote messages, you just need to enable remote communication by setting a hostname, port, the type of transport protocol (TCP by default) and the RemoteActofRefProvider:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "192.168.0.1"
      port = 5150
    }
  }
}

The RemoteActorRefProvider allows to start up actors on remote node and creates a reference representing it (by default the provider is LocalActorRefProvider which only works for communications local to a JVM).

This configuration needs to be setup on each application willing to communicate remotely with other actors. But at Skiddoo, we have multiple instances of our API running at the same time, all of them behind a node balancer. All the instances share the same remote configuration file through the -Dconfig.url option on startup. It makes things way more easier when we want to update the configuration, we don’t have to go through each box, everything is centralized.

So, if we share the same configuration, how can we setup the akka-remote hostname which is obviously different between each application ?  The solution was pretty easy, we just need to leave the hostname as an empty string:

netty.tcp {
  hostname = ""
  port = 0
}

By doing this, Akka will use InetAddress.getLocalHost.getHostAddress as the hostname. Also, setting the port to 0 will pick a random available port, which is very useful to avoid conflict. But the port solution only works if your application is the one initiating the discussion between actors. Because if your application is supposed to be a service used by others, you need to pick a predefined port and share the hostname:port combination to the other services willing to use it.

At this stage, our API was ready to communicate with skiddoo-aggregator, we just needed to get a reference to the remote actor and start sending messages:

String path = "akka.tcp://SkiddooAggregatorSystem@192.168.0.2:5150/user/SupervisorAggregator";
ActorRef aggregator = Akka.system().actorFor(path);
aggregator.tell(new SearchMessage(LocalDate.now(), "SYD", "LAX"), self());

SkiddooAggregatorSystem is the name of the Akka system in skiddoo-aggregator, SupervisorAggregator is the parent of all user’s Actors in the system. It’s located right under the guardian actor and it’s taking care of the supervising strategy in case of problems in the children actors (should we restart the child actor ? resume the actor like nothing happened ? etc.).

Our supervisor is responsible for dispatching the messages. If it’s a SearchMessage received we forward it to the SearchAggregator which has only one responsibility: process it. We are trying to make our actors as small as possible and to apply the single responsibility principle to them:

public class SupervisorAggregator extends UntypedActor {
  private final ActorRef searchAggregator;

  public SupervisorAggregator() {
    this.searchAggregator = getContext().actorOf(Props.create(SearchAggregator.class));
  }

  @Override
  public void onReceive(Object message) throws Exception {
    if (message instanceof SearchMessage) {
      searchAggregator.tell(message, sender());
    } else {
      LOG.warn("Unhandled message in SupervisorAggregator : {}", message);
      unhandled(message);
    }
  }
}
public class SearchAggregator extends UntypedActor {
  @Override
  public void onReceive(Object message) throws Exception {
    if (message instanceof SearchMessage) {
      SearchMessage search = (SearchMessage)message;
      // Process the message
    } else {
      LOG.warn("Unhandled message in SearchAggregator : {}", message);
      unhandled(message);
    }
  }
}

It’s very clear, easy to maintain and to understand. And now, we can update our tracking system without impacting our API. But, if skiddoo-aggregator is down, there is no way to detect the message has not been processed, and no way to retry until it gets processed. When skiddoo-aggregator is down, messages sent by the API will be redirected to the dead letter mailbox of the Akka system of the API. Once they are in the dead letter mailbox, there is no proper way to send them again.

We could write a custom subscriber and subscribe to the DeadLetter event.  But this is bad practice and from the documentation itself, this type of subscription should only be used for debug purpose:

The main use of this facility is for debugging, especially if an actor send does not arrive consistently (where usually inspecting the dead letters will tell you that the sender or recipient was set wrong somewhere along the way).

Retry until it gets the message

We need to detect if the message has been processed. If that’s not the case, we want to send it again until it gets processed. There is a module for this: akka-persist:

“com.typesafe.akka” %% “akka-persistence-experimental” % “2.3.11”

The philosophy behind is a little bit different from what I have been described. It’s not about detecting if a message has been received, it’s about waiting for a confirmation response in a certain timeframe, which would acknowledge the reception by the service.

If we didn’t get any response within the time frame, message gets send again. That means we are switching from a at-most-once delivery semantic to a at-least-once delivery semantic.

In normal situation, Akka guarantees that your message will be delivered maximum one time (but maybe not at all), the same message can’t be delivered multiple times. With akka-persist, because it’s expecting a confirmation in a certain timeframe. you can potentially receive the same message multiple time. If you service took too long to send the confirmation, and sent it outside the timeframe, it will receive the message again because akka-persist considered that it hasn’t been received.

The semantic switch has a cost: Unless you have a way to track that you already processed a certain message, you might end up processing the same message twice. In our case, that means we would count two searches (or more) when in reality it was only one search. Knowing that, we decided that the price to pay was not that big for us. We process thousands of thousands of searches per day, if some of them get calculated twice, it won’t change fundamentally our statistics.

This feature is why we have to use an actor to send a SearchMessage to skiddoo-aggregator. To get the benefit of the at-least-once delivery semantic, the sending actor needs to extend the class AbstractPersistentActorWithAtLeastOnceDelivery from akka-persist. This is a massive name for a Java class but it’s pretty self explanatory.

By extending this class, you get access to methods like persist and deliver which help implementing the at-least-once delivery semantic. But the philosophy is a little bit different, instead of having and ActorRef object to send messages, we have an ActorPath. An actor reference designates a single actor. An actor path represents a name which may or may not be inhabited by an actor. The akka-persist module choose to use ActorPath instead of ActorRef because the way akka-persist works is similar to what an ActorPath represents (message order not preserved, after a crash messages are still delivered to the new actor incarnation, etc.).

When you create an actor which extends AbstractPersistentActorWithAtLeastOnceDelivery, you have to override the void onReceiveCommand(Object messagemethod. Your onReceiveCommand implementation should take care of persisting and sending the message, using akka-persist features provided by AbstractPersistentActorWithAtLeastOnceDelivery:

private final ActorPath destination;

@Override
public PartialFunction<Object, BoxedUnit> receiveCommand() {
  return ReceiveBuilder
      .match(SearchMessage.class, searchMessage -> persist(searchMessage, evt -> updateState(evt)))
      .match(ConfirmDelivery.class, confirmDelivery -> persist(confirmDelivery, evt -> updateState(evt)))
      .build();
}

private void updateState(Object event) {
 if (event instanceof SearchMessage) {
    final SearchMessage evt = (SearchMessage) event;
    deliver(destination, deliveryId -> new SearchMessageWithDeliveryId.Builder()
        .withSearchQuery(evt)
        .withDeliveryId(deliveryId)
        .build());
  } else if (event instanceof ConfirmDelivery) {
    final ConfirmDelivery evt = (ConfirmDelivery) event;
    confirmDelivery(evt.getDeliveryId());
  }
}

This implementation persist the message received in a journal (to recover in case of JVM crash) and then call the deliver method. Before sending the message to the ActorPath it will provide a deliveryId (a simple long value) which represents a unique ID associated to the message sent. The updateState method transforms the message by encapsulating it in a SearchMessageWithDeliveryId object. This class is defined by skiddoo-aggregator:

public class SearchMessageWithDeliveryId implements Serializable {
  private static final long serialVersionUID = 1L;
  private final SearchQuery searchQuery;
  private final long deliveryId;
}

Once skiddoo-aggregator receives the message, it has to send back a ConfirmDelivery message (simple object which contains the deliveryId generated by akka-persist) to notify akka-persist regarding the good reception. When akka-persist receives the ConfirmDelivery message, it will call the method confirmDelivery(long deliveryId) to acknowledge that the messages previously sent has been received by the remote system. This is important to do that, otherwise, akka-persist will try again to send all the messages where their deliveryId hasn’t been confirmed.

This is how you get the at-least-once delivery semantic. By persisting messages and waiting for deliveryId to come back. If you don’t confirmDelivery, akka-persist will keep trying to send again, and again the messages until the deliveryId associated gets confirmed.

Recover from JVM down

There are multiple reasons for your JVM to be down, one of them is a simple update of your application. The good news is: recovery comes for free if you already use akka-persist.  By default, akka-persist will save every object given to the method persist on the disk, in a binary journal. The messages that have not been sent, or confirmed, will get automatically sent to the onReceiveRecover(Object event) method of your actor when the Akka system starts up. Then it’s up to you to do what you want to do with it by overriding the method.

Pretty cool right ?

By simply extending AbstractPersistentActorWithAtLeastOnceDelivery, we get a lots of feature for almost nothing. If our API crashes, the message which have not been sent, will be sent again on start up. If skiddoo-aggregator crashes, it will not send back the ConfirmDelivery message, that means the API will try to send the messages, again, again, until it receives a confirmation, that means until skiddoo-aggregator is live again: no data is lost and system recovers automatically. Pretty fancy and we love it ! Thank you Typesafe.

Share Button