Dealing with Device Reads Using Akka

You have a device monitor actor. It must poll the device every time an internal expires. When data starts flowing, it flows a lot. Sometimes, however, the device will not have readable data for long periods of time (several seconds), and continuing to read the device on short intervals will take CPU core cycles away from other parts of the actor system that are busy processing the data that has already be taken in. How can we make the device monitor actor be more in tune with the way the device operates?

Have the device monitor actor create, as part of its construction, an instance of CappedBackOffScheduler. This scheduler is used to send the “try read” messages to the monitor actor on intervals. The interval starts out at a minimal 500 milliseconds. On any given probe of the device that results in a successful read, the following interval will be 500 milliseconds. Otherwise, each successive failed read doubles the next interval, but only until a time cap is reached. Here is the CappedBackOffScheduler implementation:

class CappedBackOffScheduler(
    minimumInterval: Int,
    maximumInterval: Int,
    system: ActorSystem,
    receiver: ActorRef,
    message: Any) {
  
  var interval = minimumInterval
  
  def backOff = {
    interval = interval * 2
    if (interval > maximumInterval) interval = maximumInterval
    schedule
  }
  
  def reset = {
    interval = minimumInterval
    schedule
  }
  
  private def schedule = {
    val duration = Duration.create(interval, TimeUnit.MILLISECONDS)
    system.scheduler.scheduleOnce(duration, receiver, message)
  }
}

Each time the scheduler is told to backOff, it calculates a new interval, at least until it reaches 15 seconds. When the scheduler is told to reset following each successful read, the interval is (re)set to half a second. Either way, the interval is used to schedule a new “try read” message. The message will be sent to the device monitor once the interval expires.

Here’s how to use the CappedBackOffScheduler:

class DeviceMonitor(device: Device) extends Actor {
  val scheduler =
            new CappedBackOffScheduler(
                    500,
                    15000,
                    context.system,
                    self,
                    TryRead())
  
  def tryRead = {
    val data = device.readData(500)
    if (data.isDefined) {
      println(s"HIT: ${data.get}")
      scheduler.reset
    } else {
      println(s"MISS")
      scheduler.backOff
    }
  }
  
  def receive = {
    case request: TryRead =>
      tryRead
  }
}

Note that the device readData() attempt itself has a timeout, in this case we allow half a second for the readData() to succeed. This timeout must be carefully considered because the thread currently processing the TryRead message will block until the readData() attempt returns. And here’s some sample output:

HIT: ...
HIT: ...
HIT: ...
MISS
MISS
HIT: ...
HIT: ...
HIT: ...
MISS
HIT: ...
HIT: ...
HIT: ...
HIT: ...
MISS
MISS
MISS
MISS
HIT: ...

EAI Patterns with Actor Model: Message Channel

After a discussion of all those fancy doodahdah filters, routers, and buses, we are back to the plain old Message Channel. Yes, I did skip the Process Manager. We will get back to it, but for now it seems we all need a bit of boredom. Well, not really.

Sure, a Message Channel is simply the means by which a message producer and a message consumer communicate. Using a typical messaging mechanism, a producer creates a channel with an identity/name that is made known to the one who wants to consume the producer’s messages. The consumer opens the receiver end of the channel, prepared to consume messages. Once the producer sends a message, the consumer receives it.

It works a little differently when using the Actor Model. Each Actor provides its own Message Channel by means of which it will receive messages. This channel is often the Actor’s mailbox, and basically functions as a FIFO queue for all incoming messages. In the case of the Actor Model it’s not the producer that creates this mailbox channel. Rather, it’s the creation of the Actor that causes the underlying conceptual Message Channel to be created. And according to the rules of Actor Model, a message producer must know the address of the Actor—its Message Channel—to which it wishes to send messages.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Bus

You have a number of disparate business systems, from purchased commodity applications, to custom developed applications that help achieve competitive advantage, to those of integrating business partners. You need all of these systems to work together, although they run on different platforms and have various service interfaces, and each set of service interfaces specifies a unique data model. Sometimes it can work best to create a Message Bus that implements a simple Service-Oriented Architecture. Such a Message Bus must unify the service interface across all integrated applications, and they must all share a common Canonical Data Model.

This example presents a stock trading system with three subsystems: Stock Trader, Portfolio Manager, and Market Analysis Tools. I create a single Actor that implements the Message Bus, the TradingBus. Each of the subsystems is given an Actor that serves as its connector: StockTrader, PortfolioManager, and MarketAnalysisTools.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Message Expiration

If it is possible for a given message to become obsolete or in some way invalid due to a time laps, use a Message Expiration to control the timeout. While we have already dealt with the process timeouts in the Scatter-Gather implementation, this is different. A Message Expiration is used to determine when a single message has expired, rather than setting a limit on the completion of a larger process.

When using message-based middleware, it is possible to ask the messaging system itself to expire a message before it is ever delivered. Currently Akka does not support a mailbox that automatically detects expired messages. No worries, we can accomplish that on our own quite easily. We could create a custom mailbox type, or just place the expiration behavior on the message itself. There are advantages to both. I may implement a custom mailbox later (as suggested by Roland Kuhn in the comments), but for now will explain how to do this using a trait for messages. Whether or not the mailbox supports expiring messages, the message itself much supply some parts of the solution.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Claim Check

When you need to break up a composite message into smaller parts, but provide access to any of the parts on demand, use a Claim Check.

Consider the Claim Check a unique identifier used to store and access a checked item. You pass as part of a processing message the identity of the specific checked item; its Claim Check. Each step in the process can use the Claim Check to retrieve all or part of the composite message content as needed.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Resequencer

At this point we’ve covered a number of Message Routers in addition to some less complex, but essential, messaging patterns. With regard to my discussion of Request-Response, Ramesh Mandaleeka asked a few questions about Actor Model and message delivery. One of his questions was “How [do you] handle message sequencing?”

With Akka and other Actor systems, in general you do not need to worry about the sequence in which messages will be received by an Actor as a result of direct message sends from another Actor. As discussed in the Akka documentation, direct messages from one Actor to a second Actor are always received in the order in which the first Actor sent them. To repeat just a bit of the Akka documentation here, assume the following:

  • Actor A1 sends messages M1, M2, M3 to A2
  • Actor A3 sends messages M4, M5, M6 to A2

Based on these two scenarios, we arrive at these facts:

  1. If M1 is delivered it must be delivered before M2 and M3
  2. If M2 is delivered it must be delivered before M3
  3. If M4 is delivered it must be delivered before M5 and M6
  4. If M5 is delivered it must be delivered before M6
  5. A2 can see messages from A1 interleaved with messages from A3
  6. Since there is no guaranteed delivery, any of the messages may be dropped, i.e. not arrive at A2

The bottom line here is, don’t be concerned about a sequence of basic messages sent directly from one Actor to another being received out of order. It just won’t happen.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Scatter-Gather

We’ve actually already stepped through one implementation of Scatter-Gather. This was the combination of Recipient List and Aggregator, which provides the first of two Scatter-Gather variants. The second variant—that of using a Publish-Subscribe Channel to send RequestPriceQuote messages to interested participants—is discussed here.

But wait! The MountaineeringSuppliesOrderProcessor from the Recipient List and Aggregator samples already maintains an interestRegistry. While true, it’s not the same as a Publish-Subscribe Channel. Rather than arbitrarily providing all interested parties with requests for quotation, the MountaineeringSuppliesOrderProcessor from the previous examples ultimately determines which interests will participate in providing quotes. It filters them by means of business rules checked in calculateRecipientList().

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Aggregator

The Recipient List example didn’t demonstrate how the PriceQuote replies are assimilated by the MountaineeringSuppliesOrderProcessor. To correlate PriceQuote replies to the original RequestForQuotation we need to use the unique rfqId that has been passed along with each message:

orderProcessor ! RequestForQuotation("123", ...)
...
recipient ! RequestPriceQuote(rfq.rfqId, ...)
...
sender ! PriceQuote(rpq.rfqId, ...)

The previous example from Recipient List is extended here to include an Aggregator, which tracks the fulfillment of all requested price quotes. First note the new message types:

case class PriceQuoteFulFilled(priceQuote: PriceQuote)

case class RequiredPriceQuotesForFulfillment(rfqId: String, quotesRequested: Int)

case class QuotationFulfillment(rfqId: String, quotesRequested: Int, priceQuotes: Seq[PriceQuote], requester: ActorRef)
. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Recipient List

We’ve looked at a few different kinds of Content-Based Routers, and Recipient List is yet another. A Recipient List is compared to the To: and Cc: fields in an email message, where you specify any number of intended recipients of the email message. Thus, a Recipient List may be predetermined depending on the kind of message being sent. Yet, it is also possible for a Recipient List to take on the characteristics of a Dynamic Router in that the recipients may be determined by some set of business rules.

The example provided here performs price quoting. When the MountaineeringSuppliesOrderProcessor receives a RequestForQuotation message, it calculates a Recipient List based on a set of business rules, which means it is a kind of Dynamic Router.

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.

EAI Patterns with Actor Model: Routing Slip

Use a Routing Slip when a large business procedure logically does one thing but physically requires a series of processing steps. This achieves a service composition commonly recognized as SOA. In our Routing Slip process, each step is handled by an individual Actor. The process of this example is customer registration, as described by [Hohpe & Woolf], which includes the following steps:

  • Create a new customer
  • Record the customer’s contact information
  • Request a service plan for the customer
  • Run a credit check for the new customer

First let’s look at the top of RoutingSlip.scala and the Value Objects that comprise the data body of the registration message:

. . .

The full text is now available as part of my book Reactive Enterprise with Actor Model on Safari Books Online.