Dealing with Device Reads Using Akka

You have a device monitor actor. It must poll the device every time an internal expires. When data starts flowing, it flows a lot. Sometimes, however, the device will not have readable data for long periods of time (several seconds), and continuing to read the device on short intervals will take CPU core cycles away from other parts of the actor system that are busy processing the data that has already be taken in. How can we make the device monitor actor be more in tune with the way the device operates?

Have the device monitor actor create, as part of its construction, an instance of CappedBackOffScheduler. This scheduler is used to send the “try read” messages to the monitor actor on intervals. The interval starts out at a minimal 500 milliseconds. On any given probe of the device that results in a successful read, the following interval will be 500 milliseconds. Otherwise, each successive failed read doubles the next interval, but only until a time cap is reached. Here is the CappedBackOffScheduler implementation:

class CappedBackOffScheduler(
    minimumInterval: Int,
    maximumInterval: Int,
    system: ActorSystem,
    receiver: ActorRef,
    message: Any) {
  
  var interval = minimumInterval
  
  def backOff = {
    interval = interval * 2
    if (interval > maximumInterval) interval = maximumInterval
    schedule
  }
  
  def reset = {
    interval = minimumInterval
    schedule
  }
  
  private def schedule = {
    val duration = Duration.create(interval, TimeUnit.MILLISECONDS)
    system.scheduler.scheduleOnce(duration, receiver, message)
  }
}

Each time the scheduler is told to backOff, it calculates a new interval, at least until it reaches 15 seconds. When the scheduler is told to reset following each successful read, the interval is (re)set to half a second. Either way, the interval is used to schedule a new “try read” message. The message will be sent to the device monitor once the interval expires.

Here’s how to use the CappedBackOffScheduler:

class DeviceMonitor(device: Device) extends Actor {
  val scheduler =
            new CappedBackOffScheduler(
                    500,
                    15000,
                    context.system,
                    self,
                    TryRead())
  
  def tryRead = {
    val data = device.readData(500)
    if (data.isDefined) {
      println(s"HIT: ${data.get}")
      scheduler.reset
    } else {
      println(s"MISS")
      scheduler.backOff
    }
  }
  
  def receive = {
    case request: TryRead =>
      tryRead
  }
}

Note that the device readData() attempt itself has a timeout, in this case we allow half a second for the readData() to succeed. This timeout must be carefully considered because the thread currently processing the TryRead message will block until the readData() attempt returns. And here’s some sample output:

HIT: ...
HIT: ...
HIT: ...
MISS
MISS
HIT: ...
HIT: ...
HIT: ...
MISS
HIT: ...
HIT: ...
HIT: ...
HIT: ...
MISS
MISS
MISS
MISS
HIT: ...

DDD with Scala and Akka Revisited

There were a number of issues with the code I wrote in the post Using Scala and Akka with Domain-Driven Design. I am addressing those problems in this post and cleaning up the code. The good thing about revisiting this topic is that you can learn from the problems found in the previous post, and how to fix them.

Share Nothing

A major problem in the previous post is that I shared part of an Actor’s internal state with the outside.Recall that one of the basic rules of Actor Model is to share nothing. Oops.

The AggregateCache used a helper named AggregateCacheWorker. The existence of this class in itself is not a problem, especially when it is acting in behalf of the AggregateCache. The problem is that the DomainModel can send a ProvideWorker message to the AggregateCache to request a reference to its internal AggregateCacheWorker. That’s just wrong. The DomainModel should not be able to obtain a reference to any part of the state of AggregateCache.

So how can this problem be corrected? The main clue is to ask: Why did the DomainModel imagine it needed to use the AggregateCacheWorker? The reason is that the DomainModel needed to delegate Aggregate Actor creation to the AggregateCacheWorker because the AggregateCacheWorker holds the ActorContext needed to create a child of the AggregateCache. Yet, this is clearly not the way we should attempt to create new Aggregate Actors.

The solution to the problem actually already exists in the AggregateCache itself. Recall that when a message is sent from a client to an Aggregate Actor, the AggregateCache will check to see if that specific child Actor is currently in memory or not. If the Actor is not currently in memory, the AggregateCache creates the child Aggregate Actor dynamically and then dispatches the message being sent to it by the client. Thus, we can just leverage this AggregateCache behavior to lazily create an Aggregate Actor when the first message is sent to it.

But wait. Isn’t it a problem for the Aggregate Actor not to exist at all until a message is sent to it? Well, not if you consider an Actor’s state is mutated by it’s handling various messages. Therefore, the Actor really has an empty initial state until it receives it’s first command message. Here’s how it will work:

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("OrderProcessing")

  model.registerAggregateType("co.vaughnvernon.orderprocessing.domain.model.Order")

  val order = model.aggregateOf("co.vaughnvernon.orderprocessing.domain.model.Order", "123")

  order ! InitializeOrder(249.95)

  ...

Did you notice some fundamental changes to the client code compared to the previous example? First of all, now when registering an Aggregate type we use the fully-qualified class name of the Actor. Then when the Aggregate is created using aggregateOf(), we pass in only the type and the globally unique id. We no longer pass in the Props, because there will purposely be no initial state in the Actor until it receives its first message, which you can see is sent as InitializeOrder just following its creation by aggregateOf().

Improved Solution

So, allow me to reintroduce the Scala classes from the top down, starting with the new DomainModel and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, id: String): AggregateRef = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      aggregateType.cacheActor ! RegisterAggregateId(id)
      AggregateRef(id, aggregateType.cacheActor)
    } else {
      throw new IllegalStateException("DomainModel type registry does not have a $typeName")
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      aggregateTypeRegistry(typeName) = AggregateType(actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

Note that the DomainModel no longer attempts to use the AggregateCacheWorker. In fact, there is no longer such a worker class. Instead aggregateOf() now sends a message to the AggregateCache under which the Aggregate Actor is to exist:

aggregateType.cacheActor ! RegisterAggregateId(id)

This leads to the new implementation of the AggregateCache:

class AggregateCache(typeName: String) extends Actor {
  val aggregateClass: Class[Actor] = Class.forName(typeName).asInstanceOf[Class[Actor]]
  val aggregateIds = scala.collection.mutable.Set[String]()

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        if (!aggregateIds.contains(message.id)) {
          throw new IllegalStateException(s"No aggregate of type $typeName and id ${message.id}")
        } else {
          context.actorOf(new Props(aggregateClass), message.id)
        }
      }
      aggregate.tell(message.actualMessage, message.sender)

    case register: RegisterAggregateId =>
      this.aggregateIds.add(register.id)
  }
}

The AggregateCache no longer holds a Map of type names to Props. Instead, it now contains a Set of unique identities for each Aggregate that has been registered. You can see that each identity is registered when a RegisterAggregateId message is sent from to DomainModel and received by the AggregateCache.

Even so, what would happen if the RegisterAggregateId message is not received by the AggregateCache until after the first message is sent from the client to the yet-to-be-created Aggregate Actor? Actually this is impossible because of a simple rule of Actor Model: When a message is sent to an Actor and the Actor has a default FIFO mailbox, that message is guaranteed to be received by the Actor before any subsequently sent messages. Thus, when the DomainModel sends RegisterAggregateId to the AggregateCache, there is no way that a subsequent CacheMessage sent to the Aggregate Actor in question will be received by the AggregateCache before the RegisterAggregateId is received.

Now back to the state of the AggregateCache. It also has an aggregateClass instance value, which is created from the typeName. This is possible because the typeName now must be the fully-qualified class name of the Aggregate Actor type. The aggregateClass is passed as the only Props argument to the actorOf() function of the ActorContext. This allows the Actor to be dynamically created using the specific type for which each specific AggregateCache exists.

The support classes are only slightly different from the previous example:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(cacheActor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class RegisterAggregateId(id: String)

Finally, here are the changes to the Order Aggregate:

class Order extends Actor {
  var amount: Double = _

  def receive = {
    case init: InitializeOrder =>
      println(s"Initializing Order with $init")
      this.amount = init.amount
    case processOrder: ProcessOrder =>
      println(s"Processing Order is $processOrder")
      DomainModelPrototype.completedStep()
  }
}

case class InitializeOrder(amount: Double)
case class ProcessOrder

I think this addresses all the the issues that were apparent from the original post. Hopefully it has reenforced the basic rule of Actor Model: Share Nothing.

There are still several other pieces of the DDD with Scala and Akka puzzle to snap into place. I’ll be introducing those over the next few weeks. Well, I can only promise that it will be ASAP.

Using Scala and Akka with Domain-Driven Design

If you’ve been following my theme for a while I’ve spent a considerable amount of effort promoting Scala and Akka for use when Implementing Domain-Driven Design. Few seem to share my vision of a completely explicit use of Actors as Aggregates, with direct message sending between clients and the Aggregates (in which case the “client” may be, for example, the User Interface or another Aggregate).

NOTE: There is a follow up to this post where I address the problems found herein.

More recently there have been numerous suggestions to place various wrappers around the domain model’s Aggregates. This smells of Application Layer and its Application Services, or at least like a named cached/region such is used in Coherence or GemFire. That’s absolutely not what I want. Sure, those mechanisms could exist behind the scenes, but not between a client and the Aggregate instance (an Actor) that it wants to communicate with. The latter makes me really unhappy. The main point of my emphasis on the use of Actor Model in the first place was to get rid of all the extraneous architecture layers that cloud our vision of what the core of the application—its domain model—is really doing.

Rather than abandon Akka as a means to employ Actor Model with DDD, what I did over the past few hours was prototype a few useful abstractions as really simple set of Scala classes that could be used to implement my vision. So, here you go.

It seems appropriate to me that the basic interface to the desired functionality should be a DomainModel. Here’s a simple class and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, props: Props, id: String) = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      val worker = aggregateType.worker
      val actorRef = worker.aggregateOf(props, id)
      val cacheActor = aggregateType.actor
      AggregateRef(id, cacheActor)
    } else {
      AggregateRef(null, null)
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      implicit val timeout = Timeout(5 seconds)
      val future = actorRef ? ProvideWorker
      val worker = Await.result(future, timeout.duration).asInstanceOf[AggregateCacheWorker]
      aggregateTypeRegistry(typeName) = AggregateType(worker, actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

After your DomainModel is initialized, the idea is to allow for the registration of various Aggregate types using registerAggregateType(). After you have registered any number of types that are appropriate for your Bounded Context, you can start creating instances of them using aggregateOf(). Here you pass the type name of the Aggregate that you want to create, as well as the Props of the Akka Actor, and the id of the Aggregate. The id is both the name of the Actor and the globally unique identity of the Aggregate instance being created. Finally, if your application needs to be shutdown you must call the DomainModel shutdown() function.

Now, what is the reason for registering Aggregate types? This serves a few different purposes. First of all, there is an underlying Actor that implements a special node-specific cache. This cache Actor will serve as the parent of all Aggregate Actors within the registered type. All messages that will be sent to the Actor that implements a specific Aggregate instance must pass through the cache Actor. Here’s the class that serves as the parent of all Aggregate instances within a specified type, or named cache:

class AggregateCache(typeName: String) extends Actor {
  val worker = new AggregateCacheWorker(context)

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        context.actorOf(worker.propsFor(message.id), message.id)
      }
      aggregate.tell(message.actualMessage, message.sender)

    case ProvideWorker =>
      sender ! worker
  }
}

As you can see, the main functionality of the AggregateCache is to look up child Aggregate Actors and dispatch messages to them. If the look up fails it means that the Aggregate Actor instance no longer in memory. This implies that Aggregates can be transient; that is, they may exist in memory for some period of time, and then after some degree of inactivity they can be closed and removed from memory. However, if a message is sent to the Aggregate Actor by a client, the client needs to be assured that the Aggregate will receive the message no matter what. This is where the getOrElse expression comes into play. The Aggregate Actor will be dynamically reloaded, and then the AggregateCache can dispatch the message to it.

The AggregateCache Actor uses an internal worker object:

class AggregateCacheWorker(context: ActorContext) {
  val aggregateInfo = scala.collection.mutable.Map[String, Props]()

  def aggregateOf(props: Props, id: String): ActorRef = {
    if (!aggregateInfo.contains(id)) {
      aggregateInfo(id) = props
      context.actorOf(props, id)
    } else {
      throw new IllegalStateException(s"Aggregate with id $id already exists")
    }
  }

  def propsFor(id: String): Props = {
    if (aggregateInfo.contains(id)) {
      aggregateInfo(id)
    } else {
      throw new IllegalStateException(s"No Props for aggregate of id $id")
    }
  }
}

The AggregateCacheWorker handles support work that is needed by the DomainModel and AggregateCache. This includes creating new Aggregate instances by way of aggregateOf(), and also providing the Props instance, via propsFor(), for each Aggregate Actor that may need to be dynamically reloaded.

Finally, there are a few simple support classes:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(worker: AggregateCacheWorker, actor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class ProvideWorker

The AggregateRef is a really important abstraction. Rather than returning an ActorRef when an Aggregate Actor is created, the DomainModel instead returns an AggregateRef. As you can see from its two functions, the AggregateRef knows how to send messages to the underlying Aggregate Actor. However, it’s not a “straight shot,” so to speak, as it is with ActorRef. Instead, all messages sent to Aggregates via AggregateRef’s tell() are first sent to the AggregateCache. This allows for look up and any necessary dynamic loading of the Aggregate Actors into the cache. Messages are sent to AggregateCache as a CacheMessage instance.

So, how does it work? Here’s a runner for the prototype:

class Order(id: String, amount: Double) extends Actor {
  def receive = {
    case p: ProcessOrder =>
      println(s"Processing Order is $p")
      DomainModelPrototype.completedStep()
    case a: Any =>
      println(s"message is $a")
      DomainModelPrototype.completedStep()
  }
}

case class ProcessOrder

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("prototype")

  model.registerAggregateType("Order")

  val order = model.aggregateOf("Order", Props(new Order("123", 249.95)), "123")

  order ! ProcessOrder()

  awaitCompletion()

  model.shutdown()

  println("DomainModelPrototype: is completed.")
}

This way the Order Aggregate Actor can, as if, receive messages directly from a client. This is not actually a direct message send in the sense that ActorRef manages. Even so, ActorRef’s message sends are also not as “direct” as you might like to think. So by means of DomainModel and AggregateRef—the only abstractions of necessity exposed to the client—we get a smooth and seamless interface to Aggregate Actors. Even better, these few abstractions address the way Akka needs to work, but covers over the particulars.

NOTE: If you think the above code is a little hacky, you would be correct. I made some tradeoffs just to express my vision in Scala and Akka without getting bogged down in laborious details. The little bit of code should be revisited for some clean up. Yet, it still needs a bit more details for handling distributed caches, etc. Right now it’s just a prototype. What’s good about the revisit to the code is that corrections end up being a good learning example.

Thanks to the Akka Team, including Roland Kuhn and Patrik Nordwall, for ideas about making Akka happy. Their advice was to use some sort of intermediary to dynamically look up child Aggregate Actors, which I implemented as AggregateCache. Yet, it is my use of AggregateRef that allows the intermediary to be removed from the view of clients.

Anyway, I’ve been beating this drum for a while. I feel good that I finally had a few hours to hammer out a prototype solution for this specific set of problems. There is still a large body of work that I have accomplished around DDD with Scala and Akka that I will be presenting ASAP.

EAI Patterns with Actor Model: Message Channel

After a discussion of all those fancy doodahdah filters, routers, and buses, we are back to the plain old Message Channel. Yes, I did skip the Process Manager. We will get back to it, but for now it seems we all need a bit of boredom. Well, not really.

Sure, a Message Channel is simply the means by which a message producer and a message consumer communicate. Using a typical messaging mechanism, a producer creates a channel with an identity/name that is made known to the one who wants to consume the producer’s messages. The consumer opens the receiver end of the channel, prepared to consume messages. Once the producer sends a message, the consumer receives it.

It works a little differently when using the Actor Model. Each Actor provides its own Message Channel by means of which it will receive messages. This channel is often the Actor’s mailbox, and basically functions as a FIFO queue for all incoming messages. In the case of the Actor Model it’s not the producer that creates this mailbox channel. Rather, it’s the creation of the Actor that causes the underlying conceptual Message Channel to be created. And according to the rules of Actor Model, a message producer must know the address of the Actor—its Message Channel—to which it wishes to send messages.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Bus

You have a number of disparate business systems, from purchased commodity applications, to custom developed applications that help achieve competitive advantage, to those of integrating business partners. You need all of these systems to work together, although they run on different platforms and have various service interfaces, and each set of service interfaces specifies a unique data model. Sometimes it can work best to create a Message Bus that implements a simple Service-Oriented Architecture. Such a Message Bus must unify the service interface across all integrated applications, and they must all share a common Canonical Data Model.

This example presents a stock trading system with three subsystems: Stock Trader, Portfolio Manager, and Market Analysis Tools. I create a single Actor that implements the Message Bus, the TradingBus. Each of the subsystems is given an Actor that serves as its connector: StockTrader, PortfolioManager, and MarketAnalysisTools.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Expiration

If it is possible for a given message to become obsolete or in some way invalid due to a time laps, use a Message Expiration to control the timeout. While we have already dealt with the process timeouts in the Scatter-Gather implementation, this is different. A Message Expiration is used to determine when a single message has expired, rather than setting a limit on the completion of a larger process.

When using message-based middleware, it is possible to ask the messaging system itself to expire a message before it is ever delivered. Currently Akka does not support a mailbox that automatically detects expired messages. No worries, we can accomplish that on our own quite easily. We could create a custom mailbox type, or just place the expiration behavior on the message itself. There are advantages to both. I may implement a custom mailbox later (as suggested by Roland Kuhn in the comments), but for now will explain how to do this using a trait for messages. Whether or not the mailbox supports expiring messages, the message itself much supply some parts of the solution.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Claim Check

When you need to break up a composite message into smaller parts, but provide access to any of the parts on demand, use a Claim Check.

Consider the Claim Check a unique identifier used to store and access a checked item. You pass as part of a processing message the identity of the specific checked item; its Claim Check. Each step in the process can use the Claim Check to retrieve all or part of the composite message content as needed.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Resequencer

At this point we’ve covered a number of Message Routers in addition to some less complex, but essential, messaging patterns. With regard to my discussion of Request-Response, Ramesh Mandaleeka asked a few questions about Actor Model and message delivery. One of his questions was “How [do you] handle message sequencing?”

With Akka and other Actor systems, in general you do not need to worry about the sequence in which messages will be received by an Actor as a result of direct message sends from another Actor. As discussed in the Akka documentation, direct messages from one Actor to a second Actor are always received in the order in which the first Actor sent them. To repeat just a bit of the Akka documentation here, assume the following:

  • Actor A1 sends messages M1, M2, M3 to A2
  • Actor A3 sends messages M4, M5, M6 to A2

Based on these two scenarios, we arrive at these facts:

  1. If M1 is delivered it must be delivered before M2 and M3
  2. If M2 is delivered it must be delivered before M3
  3. If M4 is delivered it must be delivered before M5 and M6
  4. If M5 is delivered it must be delivered before M6
  5. A2 can see messages from A1 interleaved with messages from A3
  6. Since there is no guaranteed delivery, any of the messages may be dropped, i.e. not arrive at A2

The bottom line here is, don’t be concerned about a sequence of basic messages sent directly from one Actor to another being received out of order. It just won’t happen.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Scatter-Gather

We’ve actually already stepped through one implementation of Scatter-Gather. This was the combination of Recipient List and Aggregator, which provides the first of two Scatter-Gather variants. The second variant—that of using a Publish-Subscribe Channel to send RequestPriceQuote messages to interested participants—is discussed here.

But wait! The MountaineeringSuppliesOrderProcessor from the Recipient List and Aggregator samples already maintains an interestRegistry. While true, it’s not the same as a Publish-Subscribe Channel. Rather than arbitrarily providing all interested parties with requests for quotation, the MountaineeringSuppliesOrderProcessor from the previous examples ultimately determines which interests will participate in providing quotes. It filters them by means of business rules checked in calculateRecipientList().

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Aggregator

The Recipient List example didn’t demonstrate how the PriceQuote replies are assimilated by the MountaineeringSuppliesOrderProcessor. To correlate PriceQuote replies to the original RequestForQuotation we need to use the unique rfqId that has been passed along with each message:

orderProcessor ! RequestForQuotation("123", ...)
...
recipient ! RequestPriceQuote(rfq.rfqId, ...)
...
sender ! PriceQuote(rpq.rfqId, ...)

The previous example from Recipient List is extended here to include an Aggregator, which tracks the fulfillment of all requested price quotes. First note the new message types:

case class PriceQuoteFulFilled(priceQuote: PriceQuote)

case class RequiredPriceQuotesForFulfillment(rfqId: String, quotesRequested: Int)

case class QuotationFulfillment(rfqId: String, quotesRequested: Int, priceQuotes: Seq[PriceQuote], requester: ActorRef)
. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.