DDD with Scala and Akka Revisited

There were a number of issues with the code I wrote in the post Using Scala and Akka with Domain-Driven Design. I am addressing those problems in this post and cleaning up the code. The good thing about revisiting this topic is that you can learn from the problems found in the previous post, and how to fix them.

Share Nothing

A major problem in the previous post is that I shared part of an Actor’s internal state with the outside.Recall that one of the basic rules of Actor Model is to share nothing. Oops.

The AggregateCache used a helper named AggregateCacheWorker. The existence of this class in itself is not a problem, especially when it is acting in behalf of the AggregateCache. The problem is that the DomainModel can send a ProvideWorker message to the AggregateCache to request a reference to its internal AggregateCacheWorker. That’s just wrong. The DomainModel should not be able to obtain a reference to any part of the state of AggregateCache.

So how can this problem be corrected? The main clue is to ask: Why did the DomainModel imagine it needed to use the AggregateCacheWorker? The reason is that the DomainModel needed to delegate Aggregate Actor creation to the AggregateCacheWorker because the AggregateCacheWorker holds the ActorContext needed to create a child of the AggregateCache. Yet, this is clearly not the way we should attempt to create new Aggregate Actors.

The solution to the problem actually already exists in the AggregateCache itself. Recall that when a message is sent from a client to an Aggregate Actor, the AggregateCache will check to see if that specific child Actor is currently in memory or not. If the Actor is not currently in memory, the AggregateCache creates the child Aggregate Actor dynamically and then dispatches the message being sent to it by the client. Thus, we can just leverage this AggregateCache behavior to lazily create an Aggregate Actor when the first message is sent to it.

But wait. Isn’t it a problem for the Aggregate Actor not to exist at all until a message is sent to it? Well, not if you consider an Actor’s state is mutated by it’s handling various messages. Therefore, the Actor really has an empty initial state until it receives it’s first command message. Here’s how it will work:

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("OrderProcessing")

  model.registerAggregateType("co.vaughnvernon.orderprocessing.domain.model.Order")

  val order = model.aggregateOf("co.vaughnvernon.orderprocessing.domain.model.Order", "123")

  order ! InitializeOrder(249.95)

  ...

Did you notice some fundamental changes to the client code compared to the previous example? First of all, now when registering an Aggregate type we use the fully-qualified class name of the Actor. Then when the Aggregate is created using aggregateOf(), we pass in only the type and the globally unique id. We no longer pass in the Props, because there will purposely be no initial state in the Actor until it receives its first message, which you can see is sent as InitializeOrder just following its creation by aggregateOf().

Improved Solution

So, allow me to reintroduce the Scala classes from the top down, starting with the new DomainModel and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, id: String): AggregateRef = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      aggregateType.cacheActor ! RegisterAggregateId(id)
      AggregateRef(id, aggregateType.cacheActor)
    } else {
      throw new IllegalStateException("DomainModel type registry does not have a $typeName")
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      aggregateTypeRegistry(typeName) = AggregateType(actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

Note that the DomainModel no longer attempts to use the AggregateCacheWorker. In fact, there is no longer such a worker class. Instead aggregateOf() now sends a message to the AggregateCache under which the Aggregate Actor is to exist:

aggregateType.cacheActor ! RegisterAggregateId(id)

This leads to the new implementation of the AggregateCache:

class AggregateCache(typeName: String) extends Actor {
  val aggregateClass: Class[Actor] = Class.forName(typeName).asInstanceOf[Class[Actor]]
  val aggregateIds = scala.collection.mutable.Set[String]()

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        if (!aggregateIds.contains(message.id)) {
          throw new IllegalStateException(s"No aggregate of type $typeName and id ${message.id}")
        } else {
          context.actorOf(new Props(aggregateClass), message.id)
        }
      }
      aggregate.tell(message.actualMessage, message.sender)

    case register: RegisterAggregateId =>
      this.aggregateIds.add(register.id)
  }
}

The AggregateCache no longer holds a Map of type names to Props. Instead, it now contains a Set of unique identities for each Aggregate that has been registered. You can see that each identity is registered when a RegisterAggregateId message is sent from to DomainModel and received by the AggregateCache.

Even so, what would happen if the RegisterAggregateId message is not received by the AggregateCache until after the first message is sent from the client to the yet-to-be-created Aggregate Actor? Actually this is impossible because of a simple rule of Actor Model: When a message is sent to an Actor and the Actor has a default FIFO mailbox, that message is guaranteed to be received by the Actor before any subsequently sent messages. Thus, when the DomainModel sends RegisterAggregateId to the AggregateCache, there is no way that a subsequent CacheMessage sent to the Aggregate Actor in question will be received by the AggregateCache before the RegisterAggregateId is received.

Now back to the state of the AggregateCache. It also has an aggregateClass instance value, which is created from the typeName. This is possible because the typeName now must be the fully-qualified class name of the Aggregate Actor type. The aggregateClass is passed as the only Props argument to the actorOf() function of the ActorContext. This allows the Actor to be dynamically created using the specific type for which each specific AggregateCache exists.

The support classes are only slightly different from the previous example:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(cacheActor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class RegisterAggregateId(id: String)

Finally, here are the changes to the Order Aggregate:

class Order extends Actor {
  var amount: Double = _

  def receive = {
    case init: InitializeOrder =>
      println(s"Initializing Order with $init")
      this.amount = init.amount
    case processOrder: ProcessOrder =>
      println(s"Processing Order is $processOrder")
      DomainModelPrototype.completedStep()
  }
}

case class InitializeOrder(amount: Double)
case class ProcessOrder

I think this addresses all the the issues that were apparent from the original post. Hopefully it has reenforced the basic rule of Actor Model: Share Nothing.

There are still several other pieces of the DDD with Scala and Akka puzzle to snap into place. I’ll be introducing those over the next few weeks. Well, I can only promise that it will be ASAP.

Comments

  1. I’m afraid that Akka didn’t guarantee message order in that way…
    according to http://doc.akka.io/docs/akka/2.2.3/general/message-delivery-guarantees.html#Discussion__Message_Ordering
    message ordering is only guaranteed per actor pair, but there are Two pairs in your code: DomainModel AggregateCache and AggregateRef(client code) AggregateCache.

    Chris~