Summary of CRDTs

In this article I am going to introduce you to CRDTs, or Conflict-Free Replicated Data Types. You will see under what conditions CRDTs might be useful, why you might want to use such a data type, what kinds of CRDTs are available, and how you can use them.

Think of having a Shopping Cart persisted by a multi-node replicated store, and then having one of the nodes become unavailable. If you have at least 3 nodes in the store, which of the remaining nodes contains the most recent version of a given user’s Shopping Cart? You need to display the most recent one. How would you know? What can you do to resolve this problem when the user is just ready to check out and pay?

Semantic Resolution

One typical approach is to use what is called Semantic Resolution. In this case you take all copies of the Shopping Cart from each of the remaining storage replicas and apply some “business logic” to the situation. The business says that we want to make the maximum sale, so the “logical” thing to do is merge the two remaining Shopping Carts so that the “official” Shopping Cart has the maximum number of products, each with the maximum count of items, from both. However, if the user has just recently removed an item from their Shopping Cart, then merging that item back into their possession looks like an error, and is at least annoying. In the worst case the user ends up not noticing the results of the merge, buys the extra item, and then returns it for their money back. This is probably much more annoying that noticing the extra item in the Shopping Cart and removing it before final check out.

Isn’t there a better way?

Introducing CRDTs

A CRDT is literally a data type, and there are two basic ways to implement them. Both of these approaches adhere to the CRDT acronym because both are named starting with the letter C:

  1. Convergent Replicated Data Types: This is where data converges, or merges. You focus on the data on each of the storage replicas and merge the data changes between each of the replicas in order to bring each replica up to date. Obviously to make this work well you must in some way time stamp the data so you can merge in a proper way. You could use a simple time stamp, but you may have to use Vector Clocks or some similar approach to get this to work out well.
  2. Commutative Replicated Data Types: Maybe you remember Commutative Properties from mathematics. For example, you can take two numbers, such as 2 and 3, and whichever way you add them together, the results is always 5. That is, 2+3 yields the same value as 3+2. In this case we are more focused on the operations rather than the data, and we communicate the operations to each of the replicas rather than merging data. Regardless of the data they have, the results of applying the operations they receive will result in obtaining the correct state. Commutative types work well when your messaging supports at-least-once delivery, such as is the case with Akka Persistence.

In the following examples we will discuss both Convergent and Commutative types.

One Convergent-Only Replicated Data Type

Here I will discuss only one Convergent-only type, the Register. A Register type has only two operations that you can perform on it: Set and Get. Because the only modification that you can make to a Register is to set its current value, it cannot be managed concurrently as a Commutative type.

There are two ways to implement a Register:

  1. Last-Write-Wins (LWW): Here you keep the value and a timestamp of when the value was set on a given replica. The timestamp allows the value to be merged on any given replica by comparing the timestamp of the local replica with that of the one to be merged. If the timestamp of the local value is older than the sent value that is to be merged, then the local value is updated.
  2. Multi-Value: Here you may keep multiple copies of the same conceptual value, but maintain information that indicates when the value came to be set. It’s possible to hand to a client multiple values of the same thing and allow the client to apply conflict resolution to determine which of the multiple values it shall choose as “current.”

There are advantages to both of these approaches. Multi-Value provides the most versatile, but also the most complex approach. Although a Multi-Value may seem to have the same nature as the poor merging example given under Semantic Resolution, the data that indicates the differences can be based on alternative criteria such as a Version Vectors or a Vector Clock.

Some Commutative Replicated Data Types

Here we are going to discuss several CRDTs:

  1. Counters: G-Counters and PN-Counters
  2. Sets: G-Sets, 2P-Sets, U-Set, LWW-Element-Sets, and OR-Sets
  3. Graphs

Let’s take a look at these one by one.

Counters

There are two types of counters:

  1. G-Counter: This is a grow-only counter, one that only understands addition. The implementation of a G-Counter basically implements a Version Vector or Vector Clock.
  2. PN-Counter: This is a positive-negative counter, one that understands how to both add and subtract. To support this there are actually two G-Counters that are maintained: a Positive G-Counter and a Negative G-Counter.

To explain how G-Counter works, let’s consider its operations on two replicas:

  • R1 increments the G-Counter A, yielding a value of 1
  • R1 increments the G-Counter A again, yielding a value of 2
  • R2 increments the G-Counter A for the first time, yielding a value of 1

Now the increment operations are sent from R1 to R2, and from R2 to R1:

  • R2 receives an increment notification from R1 and increments G-Counter A from 1 to 2
  • R2 receives an increment notification from R1 and increments G-Counter A from 2 to 3
  • R1 receives an increment notification from R2 and increments G-Counter A from 2 to 3
  • Both R1 and R2 are now up to date with an A value of 3

Now consider an example of how the PN-Counter works between two replicas:

  • R1 increments PN-Counter B, yielding a PN value of { 1, 0 }
  • R1 increments PN-Counter B, yielding a PN value of { 2, 0 }
  • R2 increments PN-Counter B, yielding a PN value of { 1, 0 }
  • R2 decrements PN-Counter B, yielding a PN value of { 1, 1 }

Now the increment and decrement operations are sent from R1 to R2 and from R2 to R1:

  • R2 receives an increment notification from R1 and increments its P counter from 1 to 2, yielding { 2, 1 }
  • R2 receives an increment notification from R1 and increments its P counter from 2 to 3, yielding { 3, 1 }
  • R1 receives an increment notification from R2 and increments its P counter from 2 to 3, yielding { 3, 0 }
  • R1 receives a decrement notification from R2 and increments its N counter from 0 to 1, yielding { 3, 1 }
  • Both R1 and R2 are now up to date with a value of B { 3, 1}. The actual value of the PN-Counter B is yielded by subtracting the N value from the P value. So 3-1 is 2, which is the actual value of PN-Counter B.

You can use a PN-Counter to hold the count of each of the product items that are currently in the Shopping Cart. In case the counter yields a value less than 0, you can always evaluate it as 0 and not show the product in the Shopping Cart.

There are other types of Counters available, but not covered here.

Sets

A G-Set, or a grow-only set, is implemented very much like a G-Counter. Again, we will examine the use of a G-Set on two replicas:

  • R1 adds the value 1 to G-Set C, yielding a set of { 1 }
  • R1 adds the value 2 to G-Set C, yielding a set of { 1, 2 }
  • R2 adds the value 3 to G-Set C, yielding a set of { 3 }

Now the add operations are sent from R1 to R2, and from R2 to R1:

  • R2 receives an [ add 1 ] notification from R1 and R2 adds the value 1 to G-Set C, yielding { 1, 3 }
  • R2 receives an [ add 2 ] notification from R1 and R2 adds the value 2 to G-Set C, yielding { 1, 2, 3 }
  • R1 receives an [ add 3 ] notification from R2 and R1 adds the value 3 to G-Set C, yielding { 1, 2, 3 }
  • Both R1 and R2 are now up to date with a set of C as { 1, 2, 3 }

There are other kinds of sets supported, but I don’t cover them extensively here.

There is a 2P-Set, which is a 2-phase set. It is managed much the same way as a PN-Counter; that is, there is a G-Set of added elements and a G-Set of removed elements, which are merged on each replica to yield the actual up-to-date set. You cannot add an element to the removed set that does not exist locally on your added set, which effectively means you cannot remove an element that has not been added already. The remove set functions as a tombstone set, which prevents removed items from ever being added back in to the merged sets. Be careful what you remove because it can never come back.

In a U-Set each element is added with a unique identity and you can only remove an element by using its unique identity. A remove must be performed as a causal operation as it can only following the add element with the same identity.

There is also the LWW-Element-Set, where each of the items added to the set and removed from the set have a timestamp. The final current set is yielded by adding all the most recently added items and removing the most recently removed items. This allows removed elements to be added back to the LWW-Element-Set, which is not possible with the 2P-Set.

An OR-Set is an observed-remove set, where each element added to the set is assigned a unique identity. This set is kept up to date by applying causal consistency. If two of the same elements, yet with different identities, are added to the set they will both be added, although the get (look up) of the element will yield only one element as the second will be masked as required by the definition of set. When remove of a specific element is requested, all identities of the same element are matched at the given source replica. Those identities are then used to remove the same element from all replicas. A remove will only perform a remove if it is historically and causally following an add of the same element (with the same identity), which conforms to the sequential specification of a set. In other words, there must be an element matched at the source replica for this operation to work across all replicas.

You could use a 2P-Set to hold the products that are currently in the Shopping Cart, but the problem here is that if the user removed a product and then wanted to add it back, it would not be possible. For that reason a U-Set, LWW-Element-Set, or an OR-Set would be better.

Graphs

Although you might think of graphs as being very powerful, there are problems with applying Graph CRDTs in a distributed system. For this reason I suggest referring to the paper below if you think you are interested in using the Graph type.

Yeah, But…

You ask: “Why wouldn’t you use a Value Object with a built in time stamp for the Shopping Cart scenario described above? After all, it’s just a matter of figuring out which replicated Shopping Cart has the most recent data.”

First of all, modeling this as a Value Object with a time stamp would work. It’s just that there may be advantages in using CRDTs. Consider some of the following reasons.

In the case of the Shopping Cart the main advantage is in redundant data suppression. Only the changes that are made to the Shopping Cart must be copied around to other nodes, and with most of the data not making each notification journey it will make bringing all nodes up to date much faster and more likely to succeed quickly. Then if the node that is hosting the most up-to-date Shopping Cart is lost, chances are better that less data replication will allow other nodes to be updated sooner.

On the other hand, the shopping cart is probably not the best example to reason about CRDTs. The limitation with the Shopping Cart example is that it is owned by only one user and probably most of the changes will be saved to the same node first, then propagated to other nodes. However, if you think of an Aggregate that can be used and modified by multiple users simultaneously, it’s more likely that multiple nodes will be modified separately at the same time. Chances are good that if non-conflicting changes are made by each user then each of the changes can be replicated to multiple nodes without any of the multiple users being dinged for a conflict (typical optimistic concurrency failure). And, by the way, actually the same thing can be accomplished with Aggregates and Event Sourcing when events are exchanged between nodes.

The problem is easier to reason on if you simply think about many users hitting the same URL and a counter is maintained for that URL and is partitioned across many nodes. As each of many users hits the URL the counter is incremented on any one of the multiple nodes. What’s the actual total? Well, each time the counter is incremented on any given node it will take some time before the other nodes are also incremented for that same URL access. Yet, within some period of time it is possible that one or more nodes will reflect the true total of the number of hits, that is if the URL isn’t pounded every few milliseconds 24×7. In any case, any one of the nodes will show at least a close approximation of the actual total.

References

The official paper on CRDTs is provided by INRIA. You can see of good overview presentation by Sean Cribbs.

Dealing with Device Reads Using Akka

You have a device monitor actor. It must poll the device every time an internal expires. When data starts flowing, it flows a lot. Sometimes, however, the device will not have readable data for long periods of time (several seconds), and continuing to read the device on short intervals will take CPU core cycles away from other parts of the actor system that are busy processing the data that has already be taken in. How can we make the device monitor actor be more in tune with the way the device operates?

Have the device monitor actor create, as part of its construction, an instance of CappedBackOffScheduler. This scheduler is used to send the “try read” messages to the monitor actor on intervals. The interval starts out at a minimal 500 milliseconds. On any given probe of the device that results in a successful read, the following interval will be 500 milliseconds. Otherwise, each successive failed read doubles the next interval, but only until a time cap is reached. Here is the CappedBackOffScheduler implementation:

class CappedBackOffScheduler(
    minimumInterval: Int,
    maximumInterval: Int,
    system: ActorSystem,
    receiver: ActorRef,
    message: Any) {
  
  var interval = minimumInterval
  
  def backOff = {
    interval = interval * 2
    if (interval > maximumInterval) interval = maximumInterval
    schedule
  }
  
  def reset = {
    interval = minimumInterval
    schedule
  }
  
  private def schedule = {
    val duration = Duration.create(interval, TimeUnit.MILLISECONDS)
    system.scheduler.scheduleOnce(duration, receiver, message)
  }
}

Each time the scheduler is told to backOff, it calculates a new interval, at least until it reaches 15 seconds. When the scheduler is told to reset following each successful read, the interval is (re)set to half a second. Either way, the interval is used to schedule a new “try read” message. The message will be sent to the device monitor once the interval expires.

Here’s how to use the CappedBackOffScheduler:

class DeviceMonitor(device: Device) extends Actor {
  val scheduler =
            new CappedBackOffScheduler(
                    500,
                    15000,
                    context.system,
                    self,
                    TryRead())
  
  def tryRead = {
    val data = device.readData(500)
    if (data.isDefined) {
      println(s"HIT: ${data.get}")
      scheduler.reset
    } else {
      println(s"MISS")
      scheduler.backOff
    }
  }
  
  def receive = {
    case request: TryRead =>
      tryRead
  }
}

Note that the device readData() attempt itself has a timeout, in this case we allow half a second for the readData() to succeed. This timeout must be carefully considered because the thread currently processing the TryRead message will block until the readData() attempt returns. And here’s some sample output:

HIT: ...
HIT: ...
HIT: ...
MISS
MISS
HIT: ...
HIT: ...
HIT: ...
MISS
HIT: ...
HIT: ...
HIT: ...
HIT: ...
MISS
MISS
MISS
MISS
HIT: ...

DDD with Scala and Akka Revisited

There were a number of issues with the code I wrote in the post Using Scala and Akka with Domain-Driven Design. I am addressing those problems in this post and cleaning up the code. The good thing about revisiting this topic is that you can learn from the problems found in the previous post, and how to fix them.

Share Nothing

A major problem in the previous post is that I shared part of an Actor’s internal state with the outside.Recall that one of the basic rules of Actor Model is to share nothing. Oops.

The AggregateCache used a helper named AggregateCacheWorker. The existence of this class in itself is not a problem, especially when it is acting in behalf of the AggregateCache. The problem is that the DomainModel can send a ProvideWorker message to the AggregateCache to request a reference to its internal AggregateCacheWorker. That’s just wrong. The DomainModel should not be able to obtain a reference to any part of the state of AggregateCache.

So how can this problem be corrected? The main clue is to ask: Why did the DomainModel imagine it needed to use the AggregateCacheWorker? The reason is that the DomainModel needed to delegate Aggregate Actor creation to the AggregateCacheWorker because the AggregateCacheWorker holds the ActorContext needed to create a child of the AggregateCache. Yet, this is clearly not the way we should attempt to create new Aggregate Actors.

The solution to the problem actually already exists in the AggregateCache itself. Recall that when a message is sent from a client to an Aggregate Actor, the AggregateCache will check to see if that specific child Actor is currently in memory or not. If the Actor is not currently in memory, the AggregateCache creates the child Aggregate Actor dynamically and then dispatches the message being sent to it by the client. Thus, we can just leverage this AggregateCache behavior to lazily create an Aggregate Actor when the first message is sent to it.

But wait. Isn’t it a problem for the Aggregate Actor not to exist at all until a message is sent to it? Well, not if you consider an Actor’s state is mutated by it’s handling various messages. Therefore, the Actor really has an empty initial state until it receives it’s first command message. Here’s how it will work:

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("OrderProcessing")

  model.registerAggregateType("co.vaughnvernon.orderprocessing.domain.model.Order")

  val order = model.aggregateOf("co.vaughnvernon.orderprocessing.domain.model.Order", "123")

  order ! InitializeOrder(249.95)

  ...

Did you notice some fundamental changes to the client code compared to the previous example? First of all, now when registering an Aggregate type we use the fully-qualified class name of the Actor. Then when the Aggregate is created using aggregateOf(), we pass in only the type and the globally unique id. We no longer pass in the Props, because there will purposely be no initial state in the Actor until it receives its first message, which you can see is sent as InitializeOrder just following its creation by aggregateOf().

Improved Solution

So, allow me to reintroduce the Scala classes from the top down, starting with the new DomainModel and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, id: String): AggregateRef = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      aggregateType.cacheActor ! RegisterAggregateId(id)
      AggregateRef(id, aggregateType.cacheActor)
    } else {
      throw new IllegalStateException("DomainModel type registry does not have a $typeName")
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      aggregateTypeRegistry(typeName) = AggregateType(actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

Note that the DomainModel no longer attempts to use the AggregateCacheWorker. In fact, there is no longer such a worker class. Instead aggregateOf() now sends a message to the AggregateCache under which the Aggregate Actor is to exist:

aggregateType.cacheActor ! RegisterAggregateId(id)

This leads to the new implementation of the AggregateCache:

class AggregateCache(typeName: String) extends Actor {
  val aggregateClass: Class[Actor] = Class.forName(typeName).asInstanceOf[Class[Actor]]
  val aggregateIds = scala.collection.mutable.Set[String]()

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        if (!aggregateIds.contains(message.id)) {
          throw new IllegalStateException(s"No aggregate of type $typeName and id ${message.id}")
        } else {
          context.actorOf(new Props(aggregateClass), message.id)
        }
      }
      aggregate.tell(message.actualMessage, message.sender)

    case register: RegisterAggregateId =>
      this.aggregateIds.add(register.id)
  }
}

The AggregateCache no longer holds a Map of type names to Props. Instead, it now contains a Set of unique identities for each Aggregate that has been registered. You can see that each identity is registered when a RegisterAggregateId message is sent from to DomainModel and received by the AggregateCache.

Even so, what would happen if the RegisterAggregateId message is not received by the AggregateCache until after the first message is sent from the client to the yet-to-be-created Aggregate Actor? Actually this is impossible because of a simple rule of Actor Model: When a message is sent to an Actor and the Actor has a default FIFO mailbox, that message is guaranteed to be received by the Actor before any subsequently sent messages. Thus, when the DomainModel sends RegisterAggregateId to the AggregateCache, there is no way that a subsequent CacheMessage sent to the Aggregate Actor in question will be received by the AggregateCache before the RegisterAggregateId is received.

Now back to the state of the AggregateCache. It also has an aggregateClass instance value, which is created from the typeName. This is possible because the typeName now must be the fully-qualified class name of the Aggregate Actor type. The aggregateClass is passed as the only Props argument to the actorOf() function of the ActorContext. This allows the Actor to be dynamically created using the specific type for which each specific AggregateCache exists.

The support classes are only slightly different from the previous example:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(cacheActor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class RegisterAggregateId(id: String)

Finally, here are the changes to the Order Aggregate:

class Order extends Actor {
  var amount: Double = _

  def receive = {
    case init: InitializeOrder =>
      println(s"Initializing Order with $init")
      this.amount = init.amount
    case processOrder: ProcessOrder =>
      println(s"Processing Order is $processOrder")
      DomainModelPrototype.completedStep()
  }
}

case class InitializeOrder(amount: Double)
case class ProcessOrder

I think this addresses all the the issues that were apparent from the original post. Hopefully it has reenforced the basic rule of Actor Model: Share Nothing.

There are still several other pieces of the DDD with Scala and Akka puzzle to snap into place. I’ll be introducing those over the next few weeks. Well, I can only promise that it will be ASAP.

Using Scala and Akka with Domain-Driven Design

If you’ve been following my theme for a while I’ve spent a considerable amount of effort promoting Scala and Akka for use when Implementing Domain-Driven Design. Few seem to share my vision of a completely explicit use of Actors as Aggregates, with direct message sending between clients and the Aggregates (in which case the “client” may be, for example, the User Interface or another Aggregate).

NOTE: There is a follow up to this post where I address the problems found herein.

More recently there have been numerous suggestions to place various wrappers around the domain model’s Aggregates. This smells of Application Layer and its Application Services, or at least like a named cached/region such is used in Coherence or GemFire. That’s absolutely not what I want. Sure, those mechanisms could exist behind the scenes, but not between a client and the Aggregate instance (an Actor) that it wants to communicate with. The latter makes me really unhappy. The main point of my emphasis on the use of Actor Model in the first place was to get rid of all the extraneous architecture layers that cloud our vision of what the core of the application—its domain model—is really doing.

Rather than abandon Akka as a means to employ Actor Model with DDD, what I did over the past few hours was prototype a few useful abstractions as really simple set of Scala classes that could be used to implement my vision. So, here you go.

It seems appropriate to me that the basic interface to the desired functionality should be a DomainModel. Here’s a simple class and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, props: Props, id: String) = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      val worker = aggregateType.worker
      val actorRef = worker.aggregateOf(props, id)
      val cacheActor = aggregateType.actor
      AggregateRef(id, cacheActor)
    } else {
      AggregateRef(null, null)
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      implicit val timeout = Timeout(5 seconds)
      val future = actorRef ? ProvideWorker
      val worker = Await.result(future, timeout.duration).asInstanceOf[AggregateCacheWorker]
      aggregateTypeRegistry(typeName) = AggregateType(worker, actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

After your DomainModel is initialized, the idea is to allow for the registration of various Aggregate types using registerAggregateType(). After you have registered any number of types that are appropriate for your Bounded Context, you can start creating instances of them using aggregateOf(). Here you pass the type name of the Aggregate that you want to create, as well as the Props of the Akka Actor, and the id of the Aggregate. The id is both the name of the Actor and the globally unique identity of the Aggregate instance being created. Finally, if your application needs to be shutdown you must call the DomainModel shutdown() function.

Now, what is the reason for registering Aggregate types? This serves a few different purposes. First of all, there is an underlying Actor that implements a special node-specific cache. This cache Actor will serve as the parent of all Aggregate Actors within the registered type. All messages that will be sent to the Actor that implements a specific Aggregate instance must pass through the cache Actor. Here’s the class that serves as the parent of all Aggregate instances within a specified type, or named cache:

class AggregateCache(typeName: String) extends Actor {
  val worker = new AggregateCacheWorker(context)

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        context.actorOf(worker.propsFor(message.id), message.id)
      }
      aggregate.tell(message.actualMessage, message.sender)

    case ProvideWorker =>
      sender ! worker
  }
}

As you can see, the main functionality of the AggregateCache is to look up child Aggregate Actors and dispatch messages to them. If the look up fails it means that the Aggregate Actor instance no longer in memory. This implies that Aggregates can be transient; that is, they may exist in memory for some period of time, and then after some degree of inactivity they can be closed and removed from memory. However, if a message is sent to the Aggregate Actor by a client, the client needs to be assured that the Aggregate will receive the message no matter what. This is where the getOrElse expression comes into play. The Aggregate Actor will be dynamically reloaded, and then the AggregateCache can dispatch the message to it.

The AggregateCache Actor uses an internal worker object:

class AggregateCacheWorker(context: ActorContext) {
  val aggregateInfo = scala.collection.mutable.Map[String, Props]()

  def aggregateOf(props: Props, id: String): ActorRef = {
    if (!aggregateInfo.contains(id)) {
      aggregateInfo(id) = props
      context.actorOf(props, id)
    } else {
      throw new IllegalStateException(s"Aggregate with id $id already exists")
    }
  }

  def propsFor(id: String): Props = {
    if (aggregateInfo.contains(id)) {
      aggregateInfo(id)
    } else {
      throw new IllegalStateException(s"No Props for aggregate of id $id")
    }
  }
}

The AggregateCacheWorker handles support work that is needed by the DomainModel and AggregateCache. This includes creating new Aggregate instances by way of aggregateOf(), and also providing the Props instance, via propsFor(), for each Aggregate Actor that may need to be dynamically reloaded.

Finally, there are a few simple support classes:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(worker: AggregateCacheWorker, actor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class ProvideWorker

The AggregateRef is a really important abstraction. Rather than returning an ActorRef when an Aggregate Actor is created, the DomainModel instead returns an AggregateRef. As you can see from its two functions, the AggregateRef knows how to send messages to the underlying Aggregate Actor. However, it’s not a “straight shot,” so to speak, as it is with ActorRef. Instead, all messages sent to Aggregates via AggregateRef’s tell() are first sent to the AggregateCache. This allows for look up and any necessary dynamic loading of the Aggregate Actors into the cache. Messages are sent to AggregateCache as a CacheMessage instance.

So, how does it work? Here’s a runner for the prototype:

class Order(id: String, amount: Double) extends Actor {
  def receive = {
    case p: ProcessOrder =>
      println(s"Processing Order is $p")
      DomainModelPrototype.completedStep()
    case a: Any =>
      println(s"message is $a")
      DomainModelPrototype.completedStep()
  }
}

case class ProcessOrder

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("prototype")

  model.registerAggregateType("Order")

  val order = model.aggregateOf("Order", Props(new Order("123", 249.95)), "123")

  order ! ProcessOrder()

  awaitCompletion()

  model.shutdown()

  println("DomainModelPrototype: is completed.")
}

This way the Order Aggregate Actor can, as if, receive messages directly from a client. This is not actually a direct message send in the sense that ActorRef manages. Even so, ActorRef’s message sends are also not as “direct” as you might like to think. So by means of DomainModel and AggregateRef—the only abstractions of necessity exposed to the client—we get a smooth and seamless interface to Aggregate Actors. Even better, these few abstractions address the way Akka needs to work, but covers over the particulars.

NOTE: If you think the above code is a little hacky, you would be correct. I made some tradeoffs just to express my vision in Scala and Akka without getting bogged down in laborious details. The little bit of code should be revisited for some clean up. Yet, it still needs a bit more details for handling distributed caches, etc. Right now it’s just a prototype. What’s good about the revisit to the code is that corrections end up being a good learning example.

Thanks to the Akka Team, including Roland Kuhn and Patrik Nordwall, for ideas about making Akka happy. Their advice was to use some sort of intermediary to dynamically look up child Aggregate Actors, which I implemented as AggregateCache. Yet, it is my use of AggregateRef that allows the intermediary to be removed from the view of clients.

Anyway, I’ve been beating this drum for a while. I feel good that I finally had a few hours to hammer out a prototype solution for this specific set of problems. There is still a large body of work that I have accomplished around DDD with Scala and Akka that I will be presenting ASAP.

EAI Patterns with Actor Model: Message Channel

After a discussion of all those fancy doodahdah filters, routers, and buses, we are back to the plain old Message Channel. Yes, I did skip the Process Manager. We will get back to it, but for now it seems we all need a bit of boredom. Well, not really.

Sure, a Message Channel is simply the means by which a message producer and a message consumer communicate. Using a typical messaging mechanism, a producer creates a channel with an identity/name that is made known to the one who wants to consume the producer’s messages. The consumer opens the receiver end of the channel, prepared to consume messages. Once the producer sends a message, the consumer receives it.

It works a little differently when using the Actor Model. Each Actor provides its own Message Channel by means of which it will receive messages. This channel is often the Actor’s mailbox, and basically functions as a FIFO queue for all incoming messages. In the case of the Actor Model it’s not the producer that creates this mailbox channel. Rather, it’s the creation of the Actor that causes the underlying conceptual Message Channel to be created. And according to the rules of Actor Model, a message producer must know the address of the Actor—its Message Channel—to which it wishes to send messages.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Bus

You have a number of disparate business systems, from purchased commodity applications, to custom developed applications that help achieve competitive advantage, to those of integrating business partners. You need all of these systems to work together, although they run on different platforms and have various service interfaces, and each set of service interfaces specifies a unique data model. Sometimes it can work best to create a Message Bus that implements a simple Service-Oriented Architecture. Such a Message Bus must unify the service interface across all integrated applications, and they must all share a common Canonical Data Model.

This example presents a stock trading system with three subsystems: Stock Trader, Portfolio Manager, and Market Analysis Tools. I create a single Actor that implements the Message Bus, the TradingBus. Each of the subsystems is given an Actor that serves as its connector: StockTrader, PortfolioManager, and MarketAnalysisTools.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Expiration

If it is possible for a given message to become obsolete or in some way invalid due to a time laps, use a Message Expiration to control the timeout. While we have already dealt with the process timeouts in the Scatter-Gather implementation, this is different. A Message Expiration is used to determine when a single message has expired, rather than setting a limit on the completion of a larger process.

When using message-based middleware, it is possible to ask the messaging system itself to expire a message before it is ever delivered. Currently Akka does not support a mailbox that automatically detects expired messages. No worries, we can accomplish that on our own quite easily. We could create a custom mailbox type, or just place the expiration behavior on the message itself. There are advantages to both. I may implement a custom mailbox later (as suggested by Roland Kuhn in the comments), but for now will explain how to do this using a trait for messages. Whether or not the mailbox supports expiring messages, the message itself much supply some parts of the solution.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Claim Check

When you need to break up a composite message into smaller parts, but provide access to any of the parts on demand, use a Claim Check.

Consider the Claim Check a unique identifier used to store and access a checked item. You pass as part of a processing message the identity of the specific checked item; its Claim Check. Each step in the process can use the Claim Check to retrieve all or part of the composite message content as needed.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Resequencer

At this point we’ve covered a number of Message Routers in addition to some less complex, but essential, messaging patterns. With regard to my discussion of Request-Response, Ramesh Mandaleeka asked a few questions about Actor Model and message delivery. One of his questions was “How [do you] handle message sequencing?”

With Akka and other Actor systems, in general you do not need to worry about the sequence in which messages will be received by an Actor as a result of direct message sends from another Actor. As discussed in the Akka documentation, direct messages from one Actor to a second Actor are always received in the order in which the first Actor sent them. To repeat just a bit of the Akka documentation here, assume the following:

  • Actor A1 sends messages M1, M2, M3 to A2
  • Actor A3 sends messages M4, M5, M6 to A2

Based on these two scenarios, we arrive at these facts:

  1. If M1 is delivered it must be delivered before M2 and M3
  2. If M2 is delivered it must be delivered before M3
  3. If M4 is delivered it must be delivered before M5 and M6
  4. If M5 is delivered it must be delivered before M6
  5. A2 can see messages from A1 interleaved with messages from A3
  6. Since there is no guaranteed delivery, any of the messages may be dropped, i.e. not arrive at A2

The bottom line here is, don’t be concerned about a sequence of basic messages sent directly from one Actor to another being received out of order. It just won’t happen.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Scatter-Gather

We’ve actually already stepped through one implementation of Scatter-Gather. This was the combination of Recipient List and Aggregator, which provides the first of two Scatter-Gather variants. The second variant—that of using a Publish-Subscribe Channel to send RequestPriceQuote messages to interested participants—is discussed here.

But wait! The MountaineeringSuppliesOrderProcessor from the Recipient List and Aggregator samples already maintains an interestRegistry. While true, it’s not the same as a Publish-Subscribe Channel. Rather than arbitrarily providing all interested parties with requests for quotation, the MountaineeringSuppliesOrderProcessor from the previous examples ultimately determines which interests will participate in providing quotes. It filters them by means of business rules checked in calculateRecipientList().

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.