Haoyi's Programming Blog

Table of Contents

Message-based Parallelism with Actors

Posted 2020-12-02
The Death of Hype: What's Next for ScalaFrom First Principles: Why Scala?

This blog post is a chapter 16 excerpt from the book Hands-on Scala Programming

class SimpleUploadActor()(implicit cc: castor.Context) extends castor.SimpleActor[String]{
  def run(msg: String) = {
    val res = requests.post("https://httpbin.org/post", data = msg)
    println("response " + res.statusCode)
  }
}

Snippet 16.1: a simple actor implemented in Scala using the Castor library

Message-based parallelism is a technique that involves splitting your application logic into multiple "actors", each of which can run concurrently, and only interacts with other actors by exchanging asynchronous messages. This style of programming was popularized by the Erlang programming language and the Akka Scala actor library, but the approach is broadly useful and not limited to any particular language or library.

This chapter will introduce the fundamental concepts of message-based parallelism with actors, and how to use them to achieve parallelism in scenarios where the techniques we covered in Chapter 13: Fork-Join Parallelism with Futures cannot be applied. We will first discuss the basic actor APIs, see how they can be used in a standalone use case, and then see how they can be used in more involved multi-actor pipelines. The techniques in this chapter will come in useful later in Chapter 18: Building a Real-time File Synchronizer.


About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


For this chapter, we will be using the Castor library, which provides lightweight, typed actors for Scala:

import $ivy.`com.lihaoyi::castor:0.1.7`

We will be writing most of our code in Scala Scripts, which we will either load into the Ammonite Scala REPL for manual testing or test using a separate testing script. First, let us go into the core APIs that the Castor actor library exposes to users.

Castor Actors

At their core, actors are objects who receive messages via a send method, and asynchronously process those messages one after the other:

trait Actor[T]{
  def send(t: T): Unit

  def sendAsync(f: scala.concurrent.Future[T]): Unit
}

This processing happens in the background, and can take place without blocking. After a message is sent, the thread or actor that called .send() can immediately go on to do other things, even if the message hasn't been processed yet. Messages sent to an actor that is already busy will be queued up until the actor is free.

Note that Actor is parameterized on the type T, which specifies what messages a particular Actor is expected to receive. This is checked at compile time to make sure any message you send to an Actor is of the correct type.

Actor Classes

Castor provides three primary classes you can inherit from to define actors:

SimpleActor

abstract class SimpleActor[T]()(implicit cc: Context) extends Actor[T]{
  def run(msg: T): Unit
}

SimpleActor works by providing a run function that will be run on each message.

BatchActor

abstract class BatchActor[T]()(implicit cc: Context) extends Actor[T]{
  def runBatch(msgs: Seq[T]): Unit
}

BatchActor allows you to provide a runBatch function that works on groups of messages at a time: this is useful when message processing can be batched together for better efficiency, e.g. making batched database queries or batched filesystem operations instead of many individual actions.

StateMachineActor

abstract class StateMachineActor[T]()(implicit cc: Context) extends Actor[T] {
  class State(val run: T => State)
  protected[this] def initialState: State
}

StateMachineActor allows you to define actors via a set of distinct states, each of which has a separate run callback that performs actions and returns the next state that we want the actor to transition to.

While all actors can maintain state in private fields and variables that are read and modified in the run or runBatch methods, StateMachineActor makes the state and state transitions explicit. This can make it easier to specify exactly which states are valid and how the actor transitions between them.

Contexts, Exceptions, and State

All Castor actors require an implicit castor.Context parameter, which is an extended ExecutionContext that is used to schedule and manage execution of your actors. Thus having an implicit castor.Context also allows you to perform Future operations that require an implicit ExecutionContext present.

Any uncaught exception that is thrown while an actor is processing a message (or batch of messages, in the case of BatchActor) is reported to the castor.Context's reportFailure method: the default just prints to the console using .printStackTrace(), but you can hook in to pass the exceptions elsewhere e.g. if you have a remote error aggregating service.

After an exception is thrown, the actor continues processing messages as before. The internal state of the actor is unchanged from the point where the exception was thrown. In the case of StateMachineActor, state transitions only happen after the run method completes successfully, and so messages that result in exceptions do not end up changing the state.

Castor actors are meant to manage mutable state internal to the actor. Note that it is up to you to mark the state private to avoid accidental external access. Each actor may run on a different thread, and the same actor may run on different threads at different times, so you should ensure you do not share mutable variables between actors. Otherwise, you risk race conditions.

Actor-based Background Uploads

We will now look at three simple examples that exercise the three Actor base classes in a standalone fashion.

As a use case, imagine that we want to upload data to a server in the background, and we are using an actor because we do not want the upload to block the main program's execution. Furthermore, we may want batch uploads for performance, or to limit the frequency at which this actor performs uploads to avoid overloading the server.

Simple Upload Actor

A simple actor that receives messages and uploads them could be written as follows:

import $ivy.`com.lihaoyi::castor:0.1.7`

class SimpleUploadActor()(implicit cc: castor.Context)
extends castor.SimpleActor[String]{
  var count = 0
  def run(msg: String) = {
    println(s"Uploading $msg")
    val res = requests.post("https://httpbin.org/post", data=msg)
    count += 1
    println(s"response $count ${res.statusCode} " + ujson.read(res)("data"))
  }
}

implicit val cc = new castor.Context.Test()
val uploader = new SimpleUploadActor()

This snippet defines an SimpleUploadActor class that uploads all the messages it receives to the https://httpbin.org/post endpoint. Note that we need the SimpleUploadActor class to take an implicit parameter cc: castor.Context. We instantiate it as uploader, and external code can send messages to uploader via the .send method. send returns immediately while the actor processes the incoming messages one after the other in the background.

We can test this script by loading it into the Ammonite Scala REPL for interactive use via amm --predef Simple.sc, and using {}s to send three messages to the actor in quick succession:

@ {
  println("sending hello")
  uploader.send("hello")

  println("sending world")
  uploader.send("world")

  println("sending !")
  uploader.send("!")
  }
sending hello
sending world
sending !

Uploading hello
response 1 200 "hello"
Uploading world
response 2 200 "world"
Uploading !
response 3 200 "!"

Note how all three sending messages got printed before any of the HTTP requests were performed: calls to .send are asynchronous, and queue up a message for the actor to process later. Only later do the three Uploading and response messages get printed, indicating that the three requests to the httpbin.org server were performed.

Actors vs Futures

While actors and futures are both concurrency constructs, they have very different use cases.

Streaming vs Request-Response

Using actors is ideal for pipeline parallelism scenarios where the dataflow is one way. Taking logging as an example, an application writes logs but does not need to wait for them to be processed. In contrast, the futures we learned about in Chapter 13: Fork-Join Parallelism with Futures support a more request-response usage pattern, where an asynchronous computation takes a set of input parameters and produces a single result that the application needs to wait for before it can perform further work.

For most use cases, the choice of either streaming or request-response styles is made for you. Log processing tends to be a streaming process, HTTP handling tend to be request-response. Metrics and monitoring systems tend to fit streaming, whereas database queries tend to fit request-response. Actors and futures are complementary techniques, and which one you use to parallelize your code depends on whether a scenario fits better into a streaming or request-response style. Hybrid approaches that use both actors and futures together are also possible.

Preserving Ordering

Messages sent to an actor are always processed in the order in which the messages are sent. In contrast, computations running on futures may be scheduled and executed in arbitrary order, which may be different from the order in which the futures were created.

When processing application logs, the order of log messages needs to be preserved. In contrast, when hashing files, the order in which you hash the files probably does not matter. If the order of processing is important, using actors is the way to go.

Private Mutable State

Even in a concurrent environment with many messages being sent from different threads, each actor only processes messages in a single-threaded fashion one after the other. This means an actor can freely make use of private mutable fields without worrying about locks or thread-safety. In contrast, futures always have the possibility of running in parallel, and cannot safely access shared mutable variables without risking race conditions.

For example, the SimpleUploadActor earlier keeps track of a count of how many uploads have occurred. The actor's single-threaded execution means that count will always be incremented correctly, without race conditions or lost updates. If we want to use futures to perform our background uploads, we would need to make sure our count variable and any other mutable state can be safely accessed from multiple futures running at the same time: not impossible, but definitely tricky and easy to get wrong for anything more complicated than a single counter.

Note that the castor.Context.Test has extra instrumentation to support a .waitForInactivity() method, useful for waiting for the actors to complete their work in testing scenarios. This instrumentation has overhead, and you can use a castor.Context.Simple in your production code if you wish to avoid that overhead.

Batch Upload Actor

The fact that SimpleActor uploads messages one at a time can be inefficient: we may instead want to upload as many messages as we can in each HTTP request to minimize the per-request overhead. To do so, we can use a BatchActor:

import $ivy.`com.lihaoyi::castor:0.1.7`

class BatchUploadActor()(implicit cc: castor.Context)
extends castor.BatchActor[String]{
  var responseCount = 0
  def runBatch(msgs: Seq[String]) = {
    val res = requests.post("https://httpbin.org/post", data = msgs.mkString)
    responseCount += 1
    println(s"response ${res.statusCode} " + ujson.read(res)("data"))
  }
}

implicit val cc = new castor.Context.Test()
val batchUploader = new BatchUploadActor()

Now, if we send multiple messages in quick succession, the BatchUploadActor calls .mkString to concatenate them together and only performs one HTTP POST:

@ {
  println("sending hello")
  batchUploader.send("hello")

  println("sending world")
  batchUploader.send("world")

  println("sending !")
  batchUploader.send("!")
  }
sending hello
sending world
sending !

Uploading helloworld!
response 200

If further messages get sent to the BatchActor while the initial batch upload is taking place, they too are batched together and ready for the next batch upload. Essentially, every message that is received while a previous runBatch invocation is executing is batched together for the next invocation: this can be non-deterministic, and depends on thread scheduling, CPU load, networking, and many other factors.

Note that when extending BatchActor, it is up to the implementer to ensure that the BatchActor's runBatch method has the same visible effect as if they had run a single run method on each message individually. Violating that assumption may lead to confusing bugs where the actor behaves non-deterministically depending on how the messages are batched.

State Machine Upload Actor

Let us consider one more requirement: rather than sending batches of HTTP requests back to back, we would instead like to send a request at most once every N seconds. This is often called throttling, and is a common requirement to avoid overloading the remote server.

The easiest way to implement this is to define a state machine as follows:

The following snippet defines an StateMachineUploadActor that implements this protocol:

import $ivy.`com.lihaoyi::castor:0.1.7`

sealed trait Msg
case class Text(s: String) extends Msg
case class Flush() extends Msg

class StateMachineUploadActor(n: Int)(implicit cc: castor.Context)
extends castor.StateMachineActor[Msg]{
  var responseCount = 0
  def initialState = Idle()
  case class Idle() extends State({
    case Text(msg) => upload(msg)
  })
  case class Buffering(msgs: Vector[String]) extends State({
    case Text(s) => Buffering(msgs :+ s)
    case Flush() =>
      if (msgs.isEmpty) Idle()
      else upload(msgs.mkString)
  })
  def upload(data: String) = {
    println("Uploading " + data)
    val res = requests.post("https://httpbin.org/post", data=data)
    responseCount += 1
    println(s"response ${res.statusCode} " + ujson.read(res)("data"))
    cc.scheduleMsg(this, Flush(), java.time.Duration.ofSeconds(n))
    Buffering(Vector.empty)
  }
}

implicit val cc = new castor.Context.Test()
val stateMachineUploader = new StateMachineUploadActor(n = 5)

This code snippet is somewhat more complex than what we saw earlier: rather than just receiving raw Strings, StateMachineUploadActor instead receives Msgs which are either Text objects or Flushs. The actor also has two states, Idle, or Buffering, each of which pattern matches on incoming messages to decide what action to perform as well as what next state to transition into.

The implementation of this actor matches almost exactly the state machine we described above. The only subtlety is the "after N seconds of buffering" logic is implemented via cc.scheduleMsg: this means the actor will receive the Flush() message N seconds after uploading a batch of messages and transitioning to Buffering, giving it a chance to either upload any buffered messages or transition back to Idle

We can test this logic by sending messages to StateMachineUploadActor in the REPL. The first message we send gets uploaded immediately, while subsequent messages are buffered according to our N second rule:

@ stateMachineUploader.send(Text("I am Cow"))

Uploading I am Cow
response 200 "I am Cow"

@ stateMachineUploader.send(Text("Hear me moo"))

@ stateMachineUploader.send(Text("I weigh twice as much as you"))

Uploading Hear me mooI weigh twice as much as you
response 200 "Hear me mooI weigh twice as much as you"

@ stateMachineUploader.send(Text("And I look good on the barbecue"))

Uploading And I look good on the barbecue
response 200 "And I look good on the barbecue"

In general, StateMachineActor is very useful in cases where there are multiple distinct states which an actor can be in. It forces you to explicitly define:

When the number of distinct states and messages is large, StateMachineActor can be easier to manage than a SimpleActor with many mutable vars inside of it.

Note that while multiple threads can send messages to Logger at once, and the Flush message can also be sent at an arbitrary time in the future, the actor will only ever process one message at a time. This ensures that it will transition between the two states Idle and Buffering in a straightforward manner, without needing to worry about race conditions when trying to update the internal state of the actor.

Concurrent Logging Pipelines

We will now work through a slightly more advanced example: using actors to build a concurrent logging pipeline. This logging pipeline will receive logs from an application and process them in the background without needing the application to stop and wait for it.

We will start with a single actor logging to disk, and extend it to form a multi-stage concurrent logging pipeline logging its messages to multiple destinations.

Unlike the simple HTTP upload actor we saw earlier, our logging actors will have to deal with concerns such as serializing writes to a log file, log rotation, and pipelining to run different parts of the logging logic in parallel. We will also see how to test your actors programmatically in a simple and deterministic way, using castor.Context.Test. We will be using Ammonite Scala Scripts to implement and test the rest of the examples in this chapter.

A Logging SimpleActor

Here is a small demonstration of using a castor.SimpleActor to perform asynchronous logging to disk:

import $ivy.`com.lihaoyi::castor:0.1.7`
class DiskActor(logPath: os.Path, rotateSize: Int = 50)
               (implicit cc: castor.Context) extends castor.SimpleActor[String]{
  val oldPath = logPath / os.up / (logPath.last + "-old")
  def run(s: String) = {
    val newLogSize = logSize + s.length + 1
    if (newLogSize <= rotateSize) logSize = newLogSize
    else { // rotate log file by moving it to old path and starting again from empty
      logSize = s.length + 1
      os.move(logPath, oldPath, replaceExisting = true)
    }
    os.write.append(logPath, s + "\n", createFolders = true)
  }
  private var logSize = 0
}

import $file.Classes, Classes._

implicit val cc = new castor.Context.Test()

val diskActor = new DiskActor(os.pwd / "log.txt")

val logger = diskActor

We alias diskActor under the name logger for use by application code; this will simplify subsequent examples. To test this DiskActor, we will use a separate TestLoggingPipeline.sc script that imports the earlier LoggingPipeline.sc to interact with and assert on.

DiskActor doesn't just write to a log file: the actor also monitors the size of the file, and when it crosses a threshold archives it and starts from a new empty log file. This is called "log rotation", and is a common requirement when handling logs to avoid log files growing indefinitely and filling up your disk.

We can test this using the following script, which we can run via amm TestLoggingPipeline.sc.

import $file.LoggingPipeline, LoggingPipeline.{logger, cc}

logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")

// Logger hasn't finished yet, running in the background
cc.waitForInactivity()
// Now logger has finished

assert(os.read.lines(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
assert(
  os.read.lines(os.pwd / "log.txt") ==
  Seq("I am cow, I am cow", "Hear me moo, moooo")
)

Note that logger.send is thread-safe: multiple threads can be sending messages to the logger at once, and the messages will be queued up and executed one at a time. Even if logger is in the middle of writing to disk, or is currently performing a log-rotation, the fact that it's in a separate actor means the processing happens in the background without slowing down the main logic of your program.

Multi-stage Actor Pipelines

Actors give you pipelined parallelism when processing data: the ability to feed your messages through multiple stages of processing, with each stage's processing occurring in parallel. In the following example, we add a base64Actor to form a two-stage pipeline:

futures application application base64Actor base64Actor application->base64Actor diskActor diskActor base64Actor->diskActor log.txt log.txt diskActor->log.txt

diskActor handles the same writing-strings-to-disk-and-rotating-log-files logic we saw earlier, while base64Actor adds another step of encoding the data before it gets written to disk:

// Classes.sc
 import $ivy.`com.lihaoyi::castor:0.1.7`
 class DiskActor...

+class Base64Actor(dest: castor.Actor[String])
+                 (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+  def run(msg: String) = {
+    dest.send(java.util.Base64.getEncoder.encodeToString(msg.getBytes))
+  }
+}
// LoggingPipeline.sc
 implicit val cc = new castor.Context.Test()

 val diskActor = new DiskActor(os.pwd / "log.txt", rotateSize = 50)
+val base64Actor = new Base64Actor(diskActor)
-val logger = diskActor
+val logger = base64Actor

Although we have added another Base64 encoding step to the logging process, this new step lives in a separate actor from the original write-to-disk step, and both of these can run in parallel with each other as well as in parallel with the main application code.

We can modify TestLoggingPipeline.sc to verify that it writes lines to the log file base64-encoded, and that when decoded the contents are what we expect:

// TestLoggingPipeline.sc
 cc.waitForInactivity()

-assert(os.read.lines(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
-assert(
-  os.read.lines(os.pwd / "log.txt") ==
-  Seq("I am cow, I am cow", "Hear me moo, moooo")
-)
+def decodeFile(p: os.Path) = {
+  os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
+}
+assert(decodeFile(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
+assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow, I am cow", "Hear me moo, moooo"))

Non-Linear Pipelines

Actor pipelines are not limited to two stages, nor are they limited to a single linear sequence. For the last example in this chapter, let us now consider the following 4 actors arranged in a T-shaped pipeline:

futures application application sanitizeActor sanitizeActor application->sanitizeActor base64Actor base64Actor sanitizeActor->base64Actor uploadActor uploadActor base64Actor->uploadActor diskActor diskActor base64Actor->diskActor httpbin.org httpbin.org uploadActor->httpbin.org log.txt log.txt diskActor->log.txt

To implement this pipeline, we can modify LoggingPipeline.sc as follows::

// Classes.sc
 class DiskActor...
 class Base64Actor...
+class UploadActor(url: String)
+                 (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+  def run(msg: String) = {
+    val res = requests.post(url, data = msg)
+    println(s"response ${res.statusCode} " + ujson.read(res)("data"))
+  }
+}
+class SanitizeActor(dest: castor.Actor[String])
+                   (implicit cc: castor.Context) extends castor.SimpleActor[String]{
+  def run(msg: String) = {
+    dest.send(msg.replaceAll("([0-9]{4})[0-9]{8}([0-9]{4})", "<redacted>"))
+  }
+}
// LoggingPipeline.sc
 implicit val cc = new castor.Context.Test()

 val diskActor = new DiskActor(os.pwd / "log.txt")
+val uploadActor = new UploadActor("https://httpbin.org/post")
-val base64Actor = new Base64Actor(diskActor)
+val base64Actor = new Base64Actor(new castor.SplitActor(diskActor, uploadActor))
+val sanitizeActor = new SanitizeActor(base64Actor)
-val logger = base64Actor
+val logger = sanitizeActor

Apart from the new additions of uploadActor and sanitizeActor, we also use a castor.SplitActor to take the output of base64Actor and send it to two downstream destinations. SplitActor can be used to dispatch messages to any number of downstream actors.

Now, if we modify our TestLoggingPipeline.sc script to also send a 16-digit credit-card-like number as part of the logging message, we can see that it gets replaced by <redacted> in the base64 logged output:

// TestLoggingPipeline.sc
 logger.send("Comes from liquids from my udder")
-logger.send("I am cow, I am cow")
+logger.send("I am cow1234567887654321")
 logger.send("Hear me moo, moooo")

 cc.waitForInactivity()

 def decodeFile(p: os.Path) = {
   os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
 }
 assert(decodeFile(os.pwd / "log.txt-old") == Seq("Comes from liquids from my udder"))
-assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow, I am cow", "Hear me moo, moooo"))
+assert(decodeFile(os.pwd / "log.txt") == Seq("I am cow<redacted>", "Hear me moo, moooo"))

You will also see it print out the response 200 ... messages as the log messages are uploaded to the https://httpbin.org/post HTTP endpoint.

The messages we send to logger are processed with pipeline parallelism on the four actors: we can have one message being sanitized, another being base64 encoded, a third being uploaded, and a fourth being written to disk, all happening simultaneously. We gain this parallelism while preserving the order in which messages are processed, ensuring that our HTTP endpoint and log files receive the messages in the exact same order that they were originally sent in.

Any of the SimpleActors in this pipeline could also be replaced by BatchActors or StateMachineActors to improve performance or to implement additional functionality: e.g. batching writes to disk, batching HTTP uploads, or adding throttling. Doing so is left as an exercise to the reader.

Re-arranging Actor Pipelines

The four actors in our last pipeline are defined as classes, with each class constructor taking a cask.Actor[...] reference. Defining our actors in this way gives us flexibility in how we want to arrange our pipeline: each actor doesn't need to know about the details of the other actors it is interacting with. It only needs to know what message type it expects to receive and the message type of the downstream actors it needs to send messages to.

For example, if we wanted to re-configure our 4-node pipeline to run without sanitizing credit card numbers, it is easy to remove sanitizeActor from the pipeline:

import $file.Classes, Classes._

implicit val cc = new castor.Context.Test()

val diskActor = new DiskActor(os.pwd / "log.txt")
val uploadActor = new UploadActor("https://httpbin.org/post")
val base64Actor = new Base64Actor(new castor.SplitActor(diskActor, uploadActor))

val logger = base64Actor
futures application application base64Actor base64Actor application->base64Actor uploadActor uploadActor base64Actor->uploadActor diskActor diskActor base64Actor->diskActor httpbin.org httpbin.org uploadActor->httpbin.org log.txt log.txt diskActor->log.txt

What if we wanted only the file logging to be base64 encoded, and only the HTTP logging to be sanitized? Again, it is straightforward to re-configure our actor pipeline to do this:

import $file.Classes, Classes._

implicit val cc = new castor.Context.Test()

val diskActor = new DiskActor(os.pwd / "log.txt")
val uploadActor = new UploadActor("https://httpbin.org/post")

val base64Actor = new Base64Actor(diskActor)
val sanitizeActor = new SanitizeActor(uploadActor)

val logger = new castor.SplitActor(base64Actor, sanitizeActor)
futures application application sanitizeActor sanitizeActor application->sanitizeActor base64Actor base64Actor application->base64Actor uploadActor uploadActor sanitizeActor->uploadActor httpbin.org httpbin.org uploadActor->httpbin.org diskActor diskActor base64Actor->diskActor log.txt log.txt diskActor->log.txt

As you can see, using actors to model your data processing pipelines allows a great deal of flexibility in how your pipelines will be laid out. Without any change to the implementation of individual actors, we have reconfigured our concurrent logging pipeline to support 4 very different use cases. It only took a tiny change in how the actors were instantiated to completely re-architect how the data flows through our system.

This flexibility to arrange and re-arrange your actor pipelines also makes it easy to test parts of the pipeline in isolation, or to re-use parts of the pipeline in different scenarios with different requirements.

Debugging Actors

Lastly, let us look at a few techniques for debugging what an actor is doing. These will come in handy when your actor-based code inevitably misbehaves!

Debug Logging State Machines

When using StateMachineActor, all your actor's internal state should be in the single state variable. You can thus easily override def run to print the state before and after each message is received:

override def run(msg: Msg): Unit = {
  println(s"$state + $msg -> ")
  super.run(msg)
  println(state)
}

If your StateMachineActor is misbehaving, this should hopefully make it easier to trace what it is doing in response to each message, so you can figure out exactly why it is misbehaving. Here is the logging of the StateMachineUploadActor, where the logging prints out how the actor handles messages and transitions between states:

stateMachineUploader.send(Text("I am cow"))
// Idle() + Text(I am cow) ->
// Buffering(Vector(I am cow))
stateMachineUploader.send(Text("hear me moo"))
// Buffering(Vector(I am cow)) + Text(hear me moo) ->
// Buffering(Vector(I am cow, hear me moo))
Thread.sleep(100)
// Buffering(Vector(I am cow, hear me moo)) + Flush() ->
// Idle()

Logging every message received and processed by one or more actors may get very verbose in a large system. You can use a conditional if (...) in your override def run to specify exactly which state transitions on which actors you care about (e.g. only actors handling a certain user ID) to cut down the noise:

override def run(msg: Msg): Unit = {
  if (...) println(s"$state + $msg -> ")
  super.run(msg)
  if (...) println(state)
}

Note that if you have multiple actors sending messages to each other, by default they run on a thread pool and so the println messages above may become interleaved and hard to read. To resolve that, you can try running actors single threaded.

Running Actors Single Threaded

Another debugging strategy is to replace the cask.Context executor with a single-threaded executor. This can help our actor pipeline behave more deterministically:

implicit val cc = new castor.Context.TestThreadPool(1)

Any actor pipeline should be able to run on a single threaded executor. This makes it easier to track down logical bugs without multithreaded parallelism getting in the way.

Debugging using Context Logging

Apart from logging individual actors, you can also insert logging into the castor.Context to log state transitions or actions across every actor. For example, you can log every time a message is run on an actor by overriding the reportRun callback:

implicit val cc = new castor.Context.Test() {
  override def reportRun(a: castor.Actor[_],
                         msg: Any,
                         token: castor.Context.Token): Unit = {
    println(s"$a <- $msg")
    super.reportRun(a, msg, token)
  }
}

Running this on the four-actor pipeline example from earlier, we can see the logging messages get interleaved as the different actors all run in parallel.

SanitizeActor@5ad26966 <- I am cow
SanitizeActor@5ad26966 <- hear me moo
Base64Actor@5578b956 <- I am cow
SanitizeActor@5ad26966 <- I weigh twice as much as you
SanitizeActor@5ad26966 <- And I look good on the barbecue
Base64Actor@5578b956 <- hear me moo
SanitizeActor@5ad26966 <- Yoghurt curds cream cheese and butter
castor.SplitActor@7cdcd738 <- SSBhbSBjb3c=
DiskActor@7aada8fd <- SSBhbSBjb3c=
SanitizeActor@5ad26966 <- Comes from liquids from my udder
UploadActor@775713fd <- SSBhbSBjb3c=

By instrumenting the castor.Context, we can see the messages that are being sent and state transitions that are happening to all actors within our program. That can help greatly when you are not sure exactly which actor is the one that is misbehaving, and helps us visualize what our group of actors is doing. We can simplify the logging even further by also Running Actors Single-Threaded.

Conclusion

In this chapter, we have seen how to structure our code using actors. They allow us to process data concurrently, similar to what we did in Chapter 13: Fork-Join Parallelism with Futures, but with the following tradeoffs:

We have seen how we can easily construct actor pipelines of varying shape, where each stage of the pipeline processes the incoming messages in parallel, all without needing to deal with threads and locks ourselves.

Actors are a fundamental model of parallel computation, that together with the fork-join style provided by futures, is a valuable addition to your toolbox. Both models have their strengths and weaknesses, and Scala makes it very easy to pick the one that best fits the problem at hand.

This chapter makes use of the Castor actor library, which has its documentation online:

In the wild you may encounter other projects using the Akka actor framework. This is a much larger and more complex framework than Castor, with much more to learn, but all the same concepts still apply:

We will be making heavy use of actors and actor piplines later in Chapter 18: Building a Real-time File Synchronizer.

Exercise 9 - WebCrawler

Use a single actor to implement an asynchronous web crawler using the same fetchLinksAsync method we saw in Chapter 13: Fork-Join Parallelism with Futures, but without the batch-by-batch limitation. The result of each HTTP request should be processed immediately once that requests completes, without waiting for all other requests in the same "batch", so that a single long-running request does not hold up the entire crawler. You will likely need to use the asynchronous Future operations together with the sendAsync method to integrate your actors with fetchLinksAsync's futures.

Exercise 10 - WebCrawlerPipeline

Combine the actor-based web crawler you wrote in the above exercise with the DiskActor we saw earlier in the chapter, to stream the crawled results to a file on disk in a simple pipeline.

Exercise 11 - WebCrawlerThrottled

Add throttling to the actor-based web crawler above, to ensure it does not make more than a configurable maxConcurrent: Int open HTTP requests at a time.


About the Author: Haoyi is a software engineer, and the author of many open-source Scala tools such as the Ammonite REPL and the Mill Build Tool. If you enjoyed the contents on this blog, you may also enjoy Haoyi's book Hands-on Scala Programming


The Death of Hype: What's Next for ScalaFrom First Principles: Why Scala?

Updated 2020-12-02