Akka without the Actors

Welcome to the Localytics Engineering Blog! I hope this post is super interesting. Just wanted to drop a note to say that we're hiring Scala Engineers right now, so if you're interested, check out the job description here. Enough business! Enjoy the article!

Akka Actors aim to provide "simple and high-level abstractions for concurrency and parallelism." We've made good use of them at Localytics, but I found them in practice to be cumbersome and low-level. I looked for an alternative approach, and just as I was about to give up, an attractive option fell in my lap: Akka Streams. The following explores why Streams is a simpler, higher-level solution.

What's wrong with Actors?

My initial aversion came from just how much boilerplate you find in actor based code. Every line of code is another liability waiting to come back and bite me, so the less I have to write, the better. Let's see what a Hello World looks like with an Actor:

package main

import akka.actor.{ActorSystem, Actor, Props}

class HelloActor extends Actor {
  def receive: PartialFunction[Any, Unit] = {
    case s: String => println(s + "!")
    case _ => println("what am I even supposed to do here?")
  }
}

object Program {

  def run(): Unit = {
    val system = ActorSystem("hello-world")
    val actor = system.actorOf(Props[HelloActor], name = "helloactor")
    actor ! "Hello World"
    system.shutdown()
  }
}

Besides verbosity, there are subtle, yet serious, problems lurking in this actor code. A few examples of underlying problems are:

"By default actors hard-code the receiver of any messages they send. If I create an actor A that sends a message to actor B, and you want to change the receiver to actor C you are basically out of luck."

A better way

Just as I was giving up hope and getting ready to begrudgingly subject another microservice to the gripes listed above, our Chief Software Architect Andrew Rollins gave an impassioned talk about reactive streams at one of our regular engineering lunch-and-learn sessions. Angels sang and I saw a light come down from the sky. This was the answer I was looking for.

After a bit of googling and experimenting I had decided to bet the success of my microservice on the Akka Streams library. I found it solved each of the major problems I had with actor based code in the following ways.

package main

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.ExecutionContext.Implicits.global

object StreamProgram {

  def run(): Unit = {
    implicit lazy val system = ActorSystem("example")
    implicit val materializer = ActorMaterializer()
    Source(List("Hello World"))
      .map((s: String) => s + "!")
      .runWith(Sink.foreach(println))
      .onComplete {
        case _ => system.shutdown()
      }
  }
}
package main

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.ExecutionContext.Implicits.global

object StreamProgram {

  val sayFlow: Flow[String, String, Unit] =
    Flow[String].map { s =>
      s + "."
    }

  val shoutFlow: Flow[String, String, Unit] =
    Flow[String].map { s =>
      s + "!!!!"
    }

  val sayAndShoutFlow: Flow[String, String, Unit] =
    Flow() { implicit b =>
      import FlowGraph.Implicits._

      val broadcast = b.add(Broadcast[String](2))
      val merge = b.add(Merge[String](2))

      broadcast ~> sayFlow ~> merge
      broadcast ~> shoutFlow ~> merge
      (broadcast.in, merge.out)
    }

  def run(): Unit = {
    implicit lazy val system = ActorSystem("example")
    implicit val materializer = ActorMaterializer()
    Source(List("Hello World"))
      .via(sayAndShoutFlow)
      .runWith(Sink.foreach(println))
      .onComplete {
        case _ => system.shutdown()
      }
  }
}

This makes it trivial to change your data flow, even at run-time, if your program calls for it. Cumulative ordered operations are as simple as chaining calls to via. With minimal use of the FlowGraph DSL, we can achieve more interesting things such as the sayAndShout flow where each individual element is processed in multiple ways.

This principle is well-illustrated in this slide from Typesafe's Reactive Streams presentation:

Reactive Streams 1.0.0 and Why You Should Care (webinar) from Typesafe_Inc

This also seems like an appropriate time to come clean and admit that the title of this post is a lie. Akka Streams uses actors under the hood to implement the reactive streams pattern. The beauty of this is that I get the all the benefits of actors without having to worry about them in my code.

The results

When I first told Jacob Schlather, our resident Akka-tuning expert (and actor proponent), that I did not want to use actors in my code, he challenged me to keep CPU usage above 70 percent.

His reasoning was that many of our services do their slowest work in IO. One of the biggest wins you can get is to free up CPU bound work from waiting on IO to complete.

I was thrilled to find that my first crack at a streams-based implementation averaged 90 percent CPU consumption, with absolutely no tuning. Here's the current CPU profile for the stream-based service in production:

This came for free! It's all thanks to the Reactive Streams implementation that Akka Streams gives you. This service is doing a significant amount of network IO in stream. It's remarkable that the underlying implementation is smart enough to optimize around the slower parts and maximize use of available resources.

The next question in mind was "how will it scale?" We've tripled the amount of work we are doing with this service. Without having to scale up hardware, our throughput has increased along with it:

Today, the service keeps up with our incoming data pipeline using a quarter of the resources required by a non-stream-based service doing comparable work. We are thrilled with the outcome.

Picked up along the way

There were definitely a few challenges along the way. Here are a few takeaways that will make our next streams-based project easier to build:

package main

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.ExecutionContext.Implicits.global

object ConflateComparison {

  val fastStage: Flow[Unit, Unit, Unit] =
    Flow[Unit].map(_ => println("this is the fast stage"))

  val slowStage: Flow[Unit, Unit, Unit] =
    Flow[Unit].map { _ =>
      Thread.sleep(1000L)
      println("this is the SLOWWWW stage")
    }

  val conflateFlow: Flow[Unit, Unit, Unit] =
    Flow[Unit].conflate(_ => List(()))((l, u) => u :: l)
              .mapConcat(identity)

  val withConflate: Source[Unit, Unit] =
    Source(List.fill(10)(()))
      .via(fastStage)
      .via(conflateFlow)
      .via(slowStage)

  val withoutConflate: Source[Unit, Unit] =
    Source(List.fill(10)(()))
      .via(fastStage)
      .via(slowStage)

  def run(s: Source[Unit, Unit]): Unit = {
    implicit lazy val system = ActorSystem("example")
    implicit val materializer = ActorMaterializer()
    s.runWith(Sink.ignore)
      .onComplete { _ => system.shutdown() }
  }
}

While you might never stream Units like this in real life code, comparing the output of these programs makes the behavior of conflate->mapConcat apparent:

scala> ConflateComparison.run(ConflateComparison.withoutConflate)
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the SLOWWWW stage
this is the fast stage
this is the fast stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the fast stage
this is the fast stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the fast stage
this is the fast stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage

scala> ConflateComparison.run(ConflateComparison.withConflate)
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the fast stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage
this is the SLOWWWW stage

Note that the second run, using conflate -> mapConcat does all the fast work first. As soon as the slow stage can't keep up with the fast stage, conflate kicks in. In this example, we build a list of results from the fast stage, which are then flattened back into individual stream elements for hand-off as the slow stage consumes them.

This is a really nice way to optimize resource usage around a slow step in your program.

The streams library provides a SynchronousFileSource that will do this for you given a file object, but if you find yourself working with an InputStream other than a file on local disk you will need to do it manually.

Here's an example of how you would do that for a specific section of a stream (note the call to withAttributes):

package main

import akka.actor.ActorSystem
import akka.util.ByteString
import akka.stream.{ActorMaterializer, ActorAttributes}
import akka.stream.io._
import java.io.{InputStream, ByteArrayInputStream}
import akka.stream.scaladsl._
import scala.concurrent.ExecutionContext.Implicits.global

object StreamFile {

  def run(): Unit = {
    implicit lazy val system = ActorSystem("example")
    implicit val materializer = ActorMaterializer()
    val is = new ByteArrayInputStream("hello\nworld".getBytes)
    InputStreamSource(() => is)
      .withAttributes(ActorAttributes.dispatcher("akka.stream.default-file-io-dispatcher"))
      .via(Framing.delimiter(
        ByteString("\n"), maximumFrameLength = Int.MaxValue,
        allowTruncation = true))
      .map(_.utf8String)
      .runWith(Sink.foreach(println))
      .onComplete(_ => system.shutdown())
  }
}

Async operations are great for IO, but for CPU bound work the overhead of creating a future or allocating a thread is a needless cost and a net loss

This was almost a show-stopper for me. The service in question is a queue consumer, and has to process a large amount of work per queue message. Pulling all of that work in at once defeats the point of the stream. If I don't know where the work ends, how will I know where to tack on the queue message to be acknowledged?

A bit of Googling showed that this is not exactly a solved problem, but it is a problem that a few people have clearly encountered.

Tim Harper went as far as extending the core components provided by the Akka Streams library. His work provides a version that supports acknowledgement, discussed here.

Just at the point where I was beginning to consider a different route altogether, I came up with this simple "hack":

You could argue this is dirty, but it so far does exactly what I need, and until I'm shown an equally simple and better alternative, I'm sticking with it.

package main

import akka.actor.ActorSystem
import akka.util.ByteString
import akka.stream.{ActorMaterializer, ActorAttributes}
import akka.stream.io._
import java.io.{InputStream, ByteArrayInputStream}
import akka.stream.scaladsl._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

object AckedFileStream {

  class QueueMessage(str: String) {
    def ack(): Unit = println("acked")
    lazy val stream = new ByteArrayInputStream(str.getBytes)
  }

  sealed trait Element
  case class StringElement(s: String) extends Element
  case class QueueElement(qm: QueueMessage) extends Element

  def fileStream(m: QueueMessage): Source[StringElement, Future[Long]] =
    InputStreamSource(() => m.stream)
      .via(Framing.delimiter(
        ByteString("\n"), maximumFrameLength = Int.MaxValue,
        allowTruncation = true))
      .map(s => StringElement(s.utf8String))

  def run(ls: List[String]): Unit = {
    implicit lazy val system = ActorSystem("example")
    implicit val materializer = ActorMaterializer()
    Source(ls.map(s => new QueueMessage(s)))
      .map { qm =>
          fileStream(qm)
          .concat(Source(List(QueueElement(qm))))
      }.flatten(FlattenStrategy.concat)
      .runWith(Sink.foreach { (r: Element) => r match {
        case QueueElement(qm) => qm.ack()
        case StringElement(s) => println(s)
      }})
      .onComplete(_ => system.shutdown())
  }
}

This small example is actually representative of the work we are now doing with streams:

  1. Dequeueing a message that points to a remote input source...
  2. streaming that source to do work on each line...
  3. ...then acking the queue message on completion of the file.

By creating a separate stream for the file, then concatenating the queue element to the end of the stream, I am guaranteed that the queue message will arrive at the Sink exactly after the last line of the input stream is produced. The Element type family is there to provide type consistency through the stream to the Sink.

Running the code yields as follows:

scala>val ss = List("hello\nworld", "hello\nagain")
scala>AckedFileStream.run(ss)
hello
world
acked
hello
again
acked

All said and done, Akka Streams has been a huge success for us. We've cut down our boilerplate, increased readability and maintainability, reduced time spent tuning performance, and picked up a few tricks to make our next stream based service faster to build.

Have your own experiences with Akka Streams? Share in the comments.