Akka Does Async

Original Author: Sean Parsons

What On Earth Is This All About?

To those who have not seen it, I’m going to introduce the highly useful Akka library and show how asynchronous software can be developed easily using it. Most programming languages provide some kind of threading capability, from fork() in C to Threads and ExecutorService in Java and a lot of variation in between. The main downside to these models is that they tend to require a lot of manual co-ordination on the part of the programmer. For example spawning a Thread directly in Java to do some work is fine, but requires a hand rolled hook to catch the case where the Runnable throws an exception, which results in you often writing more code to handle that rather than the actual logic you’re interested in.

 

Akka And Actors.

Akka adopts the Actor Model to alleviate a lot of the issues with normal concurrent programming (erlang and Scala are languages that have native support for actors). Akka goes much further than the stock Scala actors and also supports writing actors in Java as well, with clean API support for both languages. Actor programming in Akka using Scala is based around pattern matching, which I explored in my previous post, the first example from there can be easily translated into an actor as we can see here:

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  

class IntMatchActor extends Actor {
  def receive = {
    case 1 => self.reply(“This is number 1.”)
    case 2 => self.reply(“This is number 2, it comes after 1.”)
    case _ => self.reply(“This value is neither 1 nor 2.”)
  }
}
val intMatchActor = actorOf[IntMatchActor].start
println(intMatchActor !! 1)
println(intMatchActor !! 99)

The main differences are that it inherits from Actor, calls a reply method on the self instance to give the response to the caller and rather than calling a method on the actor in the conventional sense it passes a message to the actor. That operator (!!) on the surface is where the special sauce is held, it puts the message into a mailbox for the actor to process asynchonously. I hear you cry “But this is no different to before!”, however there are variations on that operator which do things differently:

  • ! – This is a fire and forget call which immediately returns control to the caller, but means any response from the call wont even be sent by the callee.
  • !!! – In this case the operator returns an instance of Future which can be checked for completion at a later date. Two IO intensive jobs could be initiated at the same time and their results combined once they have both completed as opposed to running them sequentially, as seen below:
1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  

val firstLongProcessActor = actorOf[FirstLongProcessActor].start
val secondLongProcessActor = actorOf[SecondLongProcessActor].start
val startTime = currentTimeMillis
val firstFuture = firstLongProcessActor !!! 1000
val secondFuture = secondLongProcessActor !!! 1000
Futures.awaitAll(List(firstFuture, secondFuture))
val timeTaken = currentTimeMillis startTime
println(“Process took “ + timeTaken + “ms.”)
// Output is:
// 1: Sleeping for 1000ms.
// 2: Sleeping for 1000ms.
// 1: Slept for 1000ms.
// 2: Slept for 1000ms.
// Process took 1006ms.

 

Making Sure When Things Go Pop You Don’t Care.

As software developers we go to pains to ensure that problems are averted, handled and/or ignored.  Rather than trying to cover every eventuality (try/catch expressions more than 2 levels deep are when I start to cry), it’s much better to embrace failure, expect it and cope with it. Two prime examples of this would be services on the end of a wire, like a database server of some form or a misbehaving third party library that explodes after 50,000 calls have been made of it. Supervisors can keep an eye on at least 1 actor and restart those appropriately effectively invisibly to the callee.

Here’s a simple example of the restart mechanic, whereby the actor BrokenActor uses a class called BrokenLibrary to total up some arbitrary values, but when it exceeds 10 it throws an IllegalStateException:

1
 
  2
 
  3
 
  4
 
  5
 
  6
 
  7
 
  8
 
  9
 
  10
 
  11
 
  12
 
  13
 
  14
 
  15
 
  16
 
  17
 
  18
 
  19
 
  20
 
  21
 
  

val supervisor = Supervisor(SupervisorConfig(
  AllForOneStrategy(List(classOf[IllegalStateException]), Some(3), Some(1000)),
  Supervise(actorOf[BrokenActor], Permanent) :: Nil
))
supervisor.start
val brokenActor = supervisor.children.head
printlnRequestResult(brokenActor, 1)
printlnRequestResult(brokenActor, 5)
try {
  printlnRequestResult(brokenActor, 20)
} catch {
  case _=> println(“Expected exception thrown.”)
}
printlnRequestResult(brokenActor, 5)
// Output is:
// Request = 1, Result = Some(1)
// Request = 5, Result = Some(6)
// Expected exception thrown.
// Creating new BrokenLibrary.
// Request = 5, Result = Some(5)

While we still get the exception that occurred, the supervisor has invisibly restarted the actor back to a stable state before the next request is made against it.

 

There’s A Lot More.

This one post has only really scratched the surface of Akka other important aspects are the persistence of mailboxes, software transactional memory, integration with AMQP servers and with Apache Camel.  I intend on covering more of these in future posts and looking at how Akka performs at some later date.

 

Until Next Time…

As with my last post (and I hope every post), I’ve created a SBT project that will allow you to play with all the things discussed in this post to your hearts content, which is at the following URL: