In the first post in this series, we discussed some of the basic concepts of asynchronous programming and how they can be applied in Scala. In the second post, we explored Execution Contexts and a few advantages and disadvantages of some examples. In this post, we will go a step further and venture into the world of Akka and the Actor Model. While async computing provides the benefit of effectively using CPU resources, it comes at the cost of concurrency nightmares. Two threads operating in parallel may attempt to mutate the same variable in memory, which can introduce a bit of uncertainty into the execution of a program. The actor model helps us manage concurrency and state.

What is “Concurrency”?

To put it simply, “concurrency” is what allows two (or more) operations to use the same data without interfering with each other. In the world of immutable data structures, concurrency is taken care of because the data will never change; operations can work in parallel and trust that the data will remain consistent.

But clocks tick forward and we move in a dynamic world. As functional programmers, we try to keep things immutable wherever possible, but real-world applications generally require at least some sort of minimal mutable state. In the case of a web service handling hundreds of requests simultaneously, mutations to a shared state may happen frequently. In certain circumstances, a mutation from one thread may cause unexpected results in another thread. If all of these requests are allowed free-reign to modify that state, chaos is inevitable.

There are many methods for implementing safe concurrency into an application. The one we’ll be discussing here is the Actor Model.

What is the “Actor Model”?

An Actor Model (or Actor System) is a hierarchical structure of Actors. An Actor is an entity which contains a private, mutable state and can receive and respond to messages. An Actor also has a parent and child actors, and while this hierarchical structure provides some excellent advantages like fault tolerance, they’re out of the scope of this particular post. The two key aspects are private state and messages.

Since an Actor’s state is “private”, it can be trusted that only that Actor is allowed to modify its state. No other outside entities can modify (or even read) the Actor’s state. While an Actor can technically modify its state whenever it wants to, it is considered best practice to only modify that state in response to a Message it receives. A Message is essentially just a small package of data that is queued up in a mailbox and received by a specific Actor. Each Actor has its own mailbox of messages, and messages are handled by the Actor one-at-a-time. When an Actor receives a message, it chooses how to interpret the message and what actions to take as a result. The important part, however, is that a new message won’t be plucked out of the mailbox until the previous one has been fully handled. In the context of Akka’s implementation, that “message handler” is represented as a method, and the next message won’t be received until that method returns from handling the previous one.

Using this mechanism, you can trust that external entities will never have direct access to an Actor’s mutable state, and since an Actor will only ever process a single message at a time, the Actor needn’t worry about managing concurrency. The actor system and message dispatcher handle the concurrent mailbox queueing mechanism, and that’s really all that’s needed.

Akka Actors

Akka is an open-source framework for implementing the Actor Model. Due to the flexibility of the Actor Model, several other frameworks have stemmed off of Akka Actors, such as Akka Streams and subsequently Akka HTTP. But at the core of it all are Akka Actors.

Actors live inside of an ActorSystem. The ActorSystem handles the actor mailboxes, the passing of messages, and the ExecutionContext (called the “dispatcher” in this context). As mentioned previously, Actors live in a hierarchical structure where actors have children, and those children have children, etc. For now, let’s just deal with a flat structure where any Actors we create are created at the “top” level of the ActorSystem (“top” isn’t quite accurate since there is a default top-level Guardian actor which acts as the parent of these other actors, but we’ll ignore that for now).

To start, we need to create an ActorSystem which is remarkably simple to do:

import akka.actor.ActorSystem

implicit val system: ActorSystem =
     ActorSystem()

And voila, we now have an ActorSystem. In general, you only want to do this once; it’s ideal practice to only have a single ActorSystem running inside of a single JVM. While not tremendously expensive, they do have an associated cost, such as creating the “dispatcher” and all of the system threads that come with it. Additionally, having more than one ActorSystem can significantly confuse the code. Instead, just declare a single ActorSystem and pass it around as an implicit parameter wherever it is needed.

Creating a basic actor is also remarkably simple. We’ll expand this out with some better practices in a later blog post, but for now, a barebones actor looks like this:

import akka.actor.{Actor, Props}
val actor =
     new Actor {
          def receive: Receive = {
                 case “Test Message” => doSomething()
          }
     }

import akka.actor.ActorRef
val actorRef: ActorRef =
     system.actorOf(Props(actor))

Again, this is just the barebones example. There are more preferred ways of creating new actors which we’ll get to later, but at its core, this is all you need. The Actor class defines an abstract method `def receive: Receive`, which is just a `PartialFunction[Any, Unit]`. This is the handler for the messages that get dispatched to this Actor. The Actor is allowed to do whatever it pleases with that particular message.

But what about State?

There are some use-cases where having a stateless Actor is beneficial, but in many cases, the Actor will have at least one piece of “mutable” state data that gets modified in response to certain messages. Say, for example, your Actor is meant to store some JSON documents, like a basic document database. We want to allow other systems to add new documents and retrieve existing ones.

Adding Documents

To implement that, let’s start with “add new documents” and introduce a few more puzzle pieces.

import play.api.libs.json._

case class AddDocument(id: String, value: JsValue)

val databaseActor =
    new Actor {
        private def statefulReceive(state: Map[String, JsValue]): Receive = {
            case AddDocument(id, value) =>
                 val updatedState = state + (id, value)
                 val newHandler = statefulReceive(updatedState)
                 context.become(newHandler)
        }

        def receive: Receive = statefulReceive(Map.empty)
    }

There are a few new pieces here, so let’s walk through them.

First, we created a new case class called AddDocument. This class represents a message that gets interpreted by our Actor. It contains the document’s ID and the document’s value. Next, we declare our new Actor. In this new Actor, we have two methods: statefulReceive(…) and receive. The statefulReceive method has a parameter named “state”. We could have declared this “state” is a variable at the top-level of the class and had the base “receive” method simply mutate that class-level variable. However, the preferred way to do it is to pass that “state” into a new “receive” handler, and set the Actor’s handler to that new handler. The “statefulReceive” method interprets the provided message, determines the new “state” of the Actor, creates a new handler (by passing in the updated state to `statefulReceive`) and sets the actors Receive handler to the newly created one (using `context.become(…)`). After `context.become(…)` is called, the new handler is set, and any subsequent messages that are received by the actor will flow through the new handler.

Regardless of whether you choose to use `context.become(…)` or directly update a mutable variable inside of the class, there’s one critical piece that needs to be considered: any mutations to the state or changes to the handler via `context.become(…)` must be completed before the current handler returns. This is important when the handler for the message requires some sort of async operation. For synchronous operations like the one defined in the previous example, you needn’t worry. But if that `AddDocument(…)` required calling out to some external API or requesting information from some other actor, you must be careful not to update the Actor’s state inside of the callback of that async request. You should instead have that async callback send a new message to this Actor, and the new message will handle any changes to state.

Retrieving Documents

Adding documents is great, but the Actor is ultimately useless if we can’t get information out of it. We’ll take a similar approach of defining a new type of message, including that message in our receive handler, and having that handler reference the Actor’s “state”. But this time, we’ll go a step further and send the retrieved document back to the Actor that requested the information.

case class GetDocument(id: String)

val databaseActor =
          new Actor {

               private def statefulReceive(state: Map[String, JsValue]): Receive = {
                  case AddDocument(id, value) =>
                       …
                  case GetDocument(id) =>
                       sender() ! state.get(id)
         }
    }

Again, similar approach of declaring a new type of message, and configuring the Actor’s receive handler to interpret it accordingly. The new pieces of the puzzle are `sender()` and `!`. Inside of an Actor, calling `sender()` returns the ActorRef of the Actor who sent the `GetDocument(…)` message. The `!` method is the “tell” operator, which sends a new message to the desired ActorRef. So here, we’re determining who sent the original message, and “replying” back to it with an Option[JsValue] (the result of `state.get(id)`).

Since there were no mutations to the state, there is no need to create an updated state, nor update the Actor’s receiver method since nothing changed.

I have an Actor, but how do I use it?

We created our Actor and added it to the ActorSystem by doing:

implicit val system: ActorSystem = ...
val databaseActor =
      new Actor { … }
val databaseActorRef: ActorRef =
      system.actorOf(Props(databaseActor))

This sequence of steps results in what’s called an `ActorRef`. This `ActorRef` isn’t the actual actor itself, it’s more of a mailing address. When you want to send a letter to Grandma across the country, you generally write the letter, put it in an envelope, write down her mailing address on the envelope, write down your own address (the return address), and give it to the post office. Instead of you directly delivering the letter to Grandma, the post office handles everything for you. The post office has everything they need from you: the destination, the sender, and the letter itself.

This is very similar to how it works in Akka. The destination is the ActorRef, the sender is yourself (another ActorRef), the letter is whatever object/message is being sent, and the post office is the ActorSystem.

Sending a message in Akka is actually quite simple:

val databaseActorRef: ActorRef = …

databaseActorRef ! AddDocument(“123”, Json.obj(“x” -> true))

Here, `databaseActorRef` is the destination and `AddDocument(….)` is the message, but what about the “sender”? Similar to the post office, you don’t technically need to provide a “sender” for your message. In this case, a default `Actor.noSender` is inserted (which effectively translates to “null” under the hood). Since we don’t expect a response for this message, that’s not an issue.

But when we do expect a response, we do things a bit differently:

val databaseActorRef: ActorRef = …

import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.duration._
implicit val askTimeout =
Timeout(5.seconds)
val askResult: Future[Any] =
     databaseActorRef ? GetDocument(“123”)

val askResultDocument: Future[Option[JsValue]] =
     askResult.mapTo[Option[JsValue]]

There are a couple pieces in here. The first is the import for `akka.pattern.ask`. The “ask” pattern allows us to send a message to an Actor and expect a response in return. This is listed under “pattern” because under the hood, it still just uses the basic “tell” (aka `!`) messaging system. The difference, however, is that the “ask” pattern creates a temporary ActorRef that acts as the “sender” of the message. Next, we declare an implicit timeout for the “ask”. Then, we make the “ask” call to the ActorRef (using the `?` method/operator). When `databaseActor` replies to the sender of the message with the `Option[JsValue]` (the document), the reply goes to the temporary actor, which fulfills a Promise, which in turn provides that `Future[Any]`. Notice, however, that the return type is `Future[Any]`, not `Future[Option[JsValue]]`. For now, let’s pretend all Actor messages are untyped. Akka Typed is a means of providing type-safety to actors, but that’s a battle for another day. For now, since the messages are untyped, we must `.mapTo[Option[JsValue]]` on that returned Future.

That’s largely all there is to it. For fire-and-forget messages (like AddDocument), just use the “tell” method (a.k.a. `!`). For messages which expect a response (like GetDocument), use the “ask” pattern (a.k.a `?`), which returns a Future[Any].

Conclusion

The Actor Model provides numerous benefits, several of which were not discussed here today. But the primary benefit is its ability to simplify concurrency and avoid clunky locks and mutexes in your application code. Actors are lightweight objects which have a private state and receive messages. These actors have a very small footprint, thus thousands or millions of them can live on a single JVM. This allows you take advantage of the performance benefits of async operations, while still preserving a safe and consistent state within the application. Each actor will only ever receive one message at a time (thus concurrency is taken care of), but many actors can handle their respective messages in parallel (thus async is taken care of).

To recap their usage, define your actor class, define the messages it can handle, create a new instance of the Actor, and add it to the ActorSystem. Adding it to the ActorSystem produces an ActorRef, which acts like a mailing address for the Actor, to which messages can be addressed and delivered by the ActorSystem.

 

Other posts in the Scala Series:
Gentle Introduction to Async Scala
Async Scala Part II