Using Scala and Akka with Domain-Driven Design

If you’ve been following my theme for a while I’ve spent a considerable amount of effort promoting Scala and Akka for use when Implementing Domain-Driven Design. Few seem to share my vision of a completely explicit use of Actors as Aggregates, with direct message sending between clients and the Aggregates (in which case the “client” may be, for example, the User Interface or another Aggregate).

NOTE: There is a follow up to this post where I address the problems found herein.

More recently there have been numerous suggestions to place various wrappers around the domain model’s Aggregates. This smells of Application Layer and its Application Services, or at least like a named cached/region such is used in Coherence or GemFire. That’s absolutely not what I want. Sure, those mechanisms could exist behind the scenes, but not between a client and the Aggregate instance (an Actor) that it wants to communicate with. The latter makes me really unhappy. The main point of my emphasis on the use of Actor Model in the first place was to get rid of all the extraneous architecture layers that cloud our vision of what the core of the application—its domain model—is really doing.

Rather than abandon Akka as a means to employ Actor Model with DDD, what I did over the past few hours was prototype a few useful abstractions as really simple set of Scala classes that could be used to implement my vision. So, here you go.

It seems appropriate to me that the basic interface to the desired functionality should be a DomainModel. Here’s a simple class and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, props: Props, id: String) = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      val worker = aggregateType.worker
      val actorRef = worker.aggregateOf(props, id)
      val cacheActor = aggregateType.actor
      AggregateRef(id, cacheActor)
    } else {
      AggregateRef(null, null)
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      implicit val timeout = Timeout(5 seconds)
      val future = actorRef ? ProvideWorker
      val worker = Await.result(future, timeout.duration).asInstanceOf[AggregateCacheWorker]
      aggregateTypeRegistry(typeName) = AggregateType(worker, actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

After your DomainModel is initialized, the idea is to allow for the registration of various Aggregate types using registerAggregateType(). After you have registered any number of types that are appropriate for your Bounded Context, you can start creating instances of them using aggregateOf(). Here you pass the type name of the Aggregate that you want to create, as well as the Props of the Akka Actor, and the id of the Aggregate. The id is both the name of the Actor and the globally unique identity of the Aggregate instance being created. Finally, if your application needs to be shutdown you must call the DomainModel shutdown() function.

Now, what is the reason for registering Aggregate types? This serves a few different purposes. First of all, there is an underlying Actor that implements a special node-specific cache. This cache Actor will serve as the parent of all Aggregate Actors within the registered type. All messages that will be sent to the Actor that implements a specific Aggregate instance must pass through the cache Actor. Here’s the class that serves as the parent of all Aggregate instances within a specified type, or named cache:

class AggregateCache(typeName: String) extends Actor {
  val worker = new AggregateCacheWorker(context)

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        context.actorOf(worker.propsFor(message.id), message.id)
      }
      aggregate.tell(message.actualMessage, message.sender)

    case ProvideWorker =>
      sender ! worker
  }
}

As you can see, the main functionality of the AggregateCache is to look up child Aggregate Actors and dispatch messages to them. If the look up fails it means that the Aggregate Actor instance no longer in memory. This implies that Aggregates can be transient; that is, they may exist in memory for some period of time, and then after some degree of inactivity they can be closed and removed from memory. However, if a message is sent to the Aggregate Actor by a client, the client needs to be assured that the Aggregate will receive the message no matter what. This is where the getOrElse expression comes into play. The Aggregate Actor will be dynamically reloaded, and then the AggregateCache can dispatch the message to it.

The AggregateCache Actor uses an internal worker object:

class AggregateCacheWorker(context: ActorContext) {
  val aggregateInfo = scala.collection.mutable.Map[String, Props]()

  def aggregateOf(props: Props, id: String): ActorRef = {
    if (!aggregateInfo.contains(id)) {
      aggregateInfo(id) = props
      context.actorOf(props, id)
    } else {
      throw new IllegalStateException(s"Aggregate with id $id already exists")
    }
  }

  def propsFor(id: String): Props = {
    if (aggregateInfo.contains(id)) {
      aggregateInfo(id)
    } else {
      throw new IllegalStateException(s"No Props for aggregate of id $id")
    }
  }
}

The AggregateCacheWorker handles support work that is needed by the DomainModel and AggregateCache. This includes creating new Aggregate instances by way of aggregateOf(), and also providing the Props instance, via propsFor(), for each Aggregate Actor that may need to be dynamically reloaded.

Finally, there are a few simple support classes:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(worker: AggregateCacheWorker, actor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class ProvideWorker

The AggregateRef is a really important abstraction. Rather than returning an ActorRef when an Aggregate Actor is created, the DomainModel instead returns an AggregateRef. As you can see from its two functions, the AggregateRef knows how to send messages to the underlying Aggregate Actor. However, it’s not a “straight shot,” so to speak, as it is with ActorRef. Instead, all messages sent to Aggregates via AggregateRef’s tell() are first sent to the AggregateCache. This allows for look up and any necessary dynamic loading of the Aggregate Actors into the cache. Messages are sent to AggregateCache as a CacheMessage instance.

So, how does it work? Here’s a runner for the prototype:

class Order(id: String, amount: Double) extends Actor {
  def receive = {
    case p: ProcessOrder =>
      println(s"Processing Order is $p")
      DomainModelPrototype.completedStep()
    case a: Any =>
      println(s"message is $a")
      DomainModelPrototype.completedStep()
  }
}

case class ProcessOrder

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("prototype")

  model.registerAggregateType("Order")

  val order = model.aggregateOf("Order", Props(new Order("123", 249.95)), "123")

  order ! ProcessOrder()

  awaitCompletion()

  model.shutdown()

  println("DomainModelPrototype: is completed.")
}

This way the Order Aggregate Actor can, as if, receive messages directly from a client. This is not actually a direct message send in the sense that ActorRef manages. Even so, ActorRef’s message sends are also not as “direct” as you might like to think. So by means of DomainModel and AggregateRef—the only abstractions of necessity exposed to the client—we get a smooth and seamless interface to Aggregate Actors. Even better, these few abstractions address the way Akka needs to work, but covers over the particulars.

NOTE: If you think the above code is a little hacky, you would be correct. I made some tradeoffs just to express my vision in Scala and Akka without getting bogged down in laborious details. The little bit of code should be revisited for some clean up. Yet, it still needs a bit more details for handling distributed caches, etc. Right now it’s just a prototype. What’s good about the revisit to the code is that corrections end up being a good learning example.

Thanks to the Akka Team, including Roland Kuhn and Patrik Nordwall, for ideas about making Akka happy. Their advice was to use some sort of intermediary to dynamically look up child Aggregate Actors, which I implemented as AggregateCache. Yet, it is my use of AggregateRef that allows the intermediary to be removed from the view of clients.

Anyway, I’ve been beating this drum for a while. I feel good that I finally had a few hours to hammer out a prototype solution for this specific set of problems. There is still a large body of work that I have accomplished around DDD with Scala and Akka that I will be presenting ASAP.