Summary of CRDTs

In this article I am going to introduce you to CRDTs, or Conflict-Free Replicated Data Types. You will see under what conditions CRDTs might be useful, why you might want to use such a data type, what kinds of CRDTs are available, and how you can use them.

Think of having a Shopping Cart persisted by a multi-node replicated store, and then having one of the nodes become unavailable. If you have at least 3 nodes in the store, which of the remaining nodes contains the most recent version of a given user’s Shopping Cart? You need to display the most recent one. How would you know? What can you do to resolve this problem when the user is just ready to check out and pay?

Semantic Resolution

One typical approach is to use what is called Semantic Resolution. In this case you take all copies of the Shopping Cart from each of the remaining storage replicas and apply some “business logic” to the situation. The business says that we want to make the maximum sale, so the “logical” thing to do is merge the two remaining Shopping Carts so that the “official” Shopping Cart has the maximum number of products, each with the maximum count of items, from both. However, if the user has just recently removed an item from their Shopping Cart, then merging that item back into their possession looks like an error, and is at least annoying. In the worst case the user ends up not noticing the results of the merge, buys the extra item, and then returns it for their money back. This is probably much more annoying that noticing the extra item in the Shopping Cart and removing it before final check out.

Isn’t there a better way?

Introducing CRDTs

A CRDT is literally a data type, and there are two basic ways to implement them. Both of these approaches adhere to the CRDT acronym because both are named starting with the letter C:

  1. Convergent Replicated Data Types: This is where data converges, or merges. You focus on the data on each of the storage replicas and merge the data changes between each of the replicas in order to bring each replica up to date. Obviously to make this work well you must in some way time stamp the data so you can merge in a proper way. You could use a simple time stamp, but you may have to use Vector Clocks or some similar approach to get this to work out well.
  2. Commutative Replicated Data Types: Maybe you remember Commutative Properties from mathematics. For example, you can take two numbers, such as 2 and 3, and whichever way you add them together, the results is always 5. That is, 2+3 yields the same value as 3+2. In this case we are more focused on the operations rather than the data, and we communicate the operations to each of the replicas rather than merging data. Regardless of the data they have, the results of applying the operations they receive will result in obtaining the correct state. Commutative types work well when your messaging supports at-least-once delivery, such as is the case with Akka Persistence.

In the following examples we will discuss both Convergent and Commutative types.

One Convergent-Only Replicated Data Type

Here I will discuss only one Convergent-only type, the Register. A Register type has only two operations that you can perform on it: Set and Get. Because the only modification that you can make to a Register is to set its current value, it cannot be managed concurrently as a Commutative type.

There are two ways to implement a Register:

  1. Last-Write-Wins (LWW): Here you keep the value and a timestamp of when the value was set on a given replica. The timestamp allows the value to be merged on any given replica by comparing the timestamp of the local replica with that of the one to be merged. If the timestamp of the local value is older than the sent value that is to be merged, then the local value is updated.
  2. Multi-Value: Here you may keep multiple copies of the same conceptual value, but maintain information that indicates when the value came to be set. It’s possible to hand to a client multiple values of the same thing and allow the client to apply conflict resolution to determine which of the multiple values it shall choose as “current.”

There are advantages to both of these approaches. Multi-Value provides the most versatile, but also the most complex approach. Although a Multi-Value may seem to have the same nature as the poor merging example given under Semantic Resolution, the data that indicates the differences can be based on alternative criteria such as a Version Vectors or a Vector Clock.

Some Commutative Replicated Data Types

Here we are going to discuss several CRDTs:

  1. Counters: G-Counters and PN-Counters
  2. Sets: G-Sets, 2P-Sets, U-Set, LWW-Element-Sets, and OR-Sets
  3. Graphs

Let’s take a look at these one by one.

Counters

There are two types of counters:

  1. G-Counter: This is a grow-only counter, one that only understands addition. The implementation of a G-Counter basically implements a Version Vector or Vector Clock.
  2. PN-Counter: This is a positive-negative counter, one that understands how to both add and subtract. To support this there are actually two G-Counters that are maintained: a Positive G-Counter and a Negative G-Counter.

To explain how G-Counter works, let’s consider its operations on two replicas:

  • R1 increments the G-Counter A, yielding a value of 1
  • R1 increments the G-Counter A again, yielding a value of 2
  • R2 increments the G-Counter A for the first time, yielding a value of 1

Now the increment operations are sent from R1 to R2, and from R2 to R1:

  • R2 receives an increment notification from R1 and increments G-Counter A from 1 to 2
  • R2 receives an increment notification from R1 and increments G-Counter A from 2 to 3
  • R1 receives an increment notification from R2 and increments G-Counter A from 2 to 3
  • Both R1 and R2 are now up to date with an A value of 3

Now consider an example of how the PN-Counter works between two replicas:

  • R1 increments PN-Counter B, yielding a PN value of { 1, 0 }
  • R1 increments PN-Counter B, yielding a PN value of { 2, 0 }
  • R2 increments PN-Counter B, yielding a PN value of { 1, 0 }
  • R2 decrements PN-Counter B, yielding a PN value of { 1, 1 }

Now the increment and decrement operations are sent from R1 to R2 and from R2 to R1:

  • R2 receives an increment notification from R1 and increments its P counter from 1 to 2, yielding { 2, 1 }
  • R2 receives an increment notification from R1 and increments its P counter from 2 to 3, yielding { 3, 1 }
  • R1 receives an increment notification from R2 and increments its P counter from 2 to 3, yielding { 3, 0 }
  • R1 receives a decrement notification from R2 and increments its N counter from 0 to 1, yielding { 3, 1 }
  • Both R1 and R2 are now up to date with a value of B { 3, 1}. The actual value of the PN-Counter B is yielded by subtracting the N value from the P value. So 3-1 is 2, which is the actual value of PN-Counter B.

You can use a PN-Counter to hold the count of each of the product items that are currently in the Shopping Cart. In case the counter yields a value less than 0, you can always evaluate it as 0 and not show the product in the Shopping Cart.

There are other types of Counters available, but not covered here.

Sets

A G-Set, or a grow-only set, is implemented very much like a G-Counter. Again, we will examine the use of a G-Set on two replicas:

  • R1 adds the value 1 to G-Set C, yielding a set of { 1 }
  • R1 adds the value 2 to G-Set C, yielding a set of { 1, 2 }
  • R2 adds the value 3 to G-Set C, yielding a set of { 3 }

Now the add operations are sent from R1 to R2, and from R2 to R1:

  • R2 receives an [ add 1 ] notification from R1 and R2 adds the value 1 to G-Set C, yielding { 1, 3 }
  • R2 receives an [ add 2 ] notification from R1 and R2 adds the value 2 to G-Set C, yielding { 1, 2, 3 }
  • R1 receives an [ add 3 ] notification from R2 and R1 adds the value 3 to G-Set C, yielding { 1, 2, 3 }
  • Both R1 and R2 are now up to date with a set of C as { 1, 2, 3 }

There are other kinds of sets supported, but I don’t cover them extensively here.

There is a 2P-Set, which is a 2-phase set. It is managed much the same way as a PN-Counter; that is, there is a G-Set of added elements and a G-Set of removed elements, which are merged on each replica to yield the actual up-to-date set. You cannot add an element to the removed set that does not exist locally on your added set, which effectively means you cannot remove an element that has not been added already. The remove set functions as a tombstone set, which prevents removed items from ever being added back in to the merged sets. Be careful what you remove because it can never come back.

In a U-Set each element is added with a unique identity and you can only remove an element by using its unique identity. A remove must be performed as a causal operation as it can only following the add element with the same identity.

There is also the LWW-Element-Set, where each of the items added to the set and removed from the set have a timestamp. The final current set is yielded by adding all the most recently added items and removing the most recently removed items. This allows removed elements to be added back to the LWW-Element-Set, which is not possible with the 2P-Set.

An OR-Set is an observed-remove set, where each element added to the set is assigned a unique identity. This set is kept up to date by applying causal consistency. If two of the same elements, yet with different identities, are added to the set they will both be added, although the get (look up) of the element will yield only one element as the second will be masked as required by the definition of set. When remove of a specific element is requested, all identities of the same element are matched at the given source replica. Those identities are then used to remove the same element from all replicas. A remove will only perform a remove if it is historically and causally following an add of the same element (with the same identity), which conforms to the sequential specification of a set. In other words, there must be an element matched at the source replica for this operation to work across all replicas.

You could use a 2P-Set to hold the products that are currently in the Shopping Cart, but the problem here is that if the user removed a product and then wanted to add it back, it would not be possible. For that reason a U-Set, LWW-Element-Set, or an OR-Set would be better.

Graphs

Although you might think of graphs as being very powerful, there are problems with applying Graph CRDTs in a distributed system. For this reason I suggest referring to the paper below if you think you are interested in using the Graph type.

Yeah, But…

You ask: “Why wouldn’t you use a Value Object with a built in time stamp for the Shopping Cart scenario described above? After all, it’s just a matter of figuring out which replicated Shopping Cart has the most recent data.”

First of all, modeling this as a Value Object with a time stamp would work. It’s just that there may be advantages in using CRDTs. Consider some of the following reasons.

In the case of the Shopping Cart the main advantage is in redundant data suppression. Only the changes that are made to the Shopping Cart must be copied around to other nodes, and with most of the data not making each notification journey it will make bringing all nodes up to date much faster and more likely to succeed quickly. Then if the node that is hosting the most up-to-date Shopping Cart is lost, chances are better that less data replication will allow other nodes to be updated sooner.

On the other hand, the shopping cart is probably not the best example to reason about CRDTs. The limitation with the Shopping Cart example is that it is owned by only one user and probably most of the changes will be saved to the same node first, then propagated to other nodes. However, if you think of an Aggregate that can be used and modified by multiple users simultaneously, it’s more likely that multiple nodes will be modified separately at the same time. Chances are good that if non-conflicting changes are made by each user then each of the changes can be replicated to multiple nodes without any of the multiple users being dinged for a conflict (typical optimistic concurrency failure). And, by the way, actually the same thing can be accomplished with Aggregates and Event Sourcing when events are exchanged between nodes.

The problem is easier to reason on if you simply think about many users hitting the same URL and a counter is maintained for that URL and is partitioned across many nodes. As each of many users hits the URL the counter is incremented on any one of the multiple nodes. What’s the actual total? Well, each time the counter is incremented on any given node it will take some time before the other nodes are also incremented for that same URL access. Yet, within some period of time it is possible that one or more nodes will reflect the true total of the number of hits, that is if the URL isn’t pounded every few milliseconds 24×7. In any case, any one of the nodes will show at least a close approximation of the actual total.

References

The official paper on CRDTs is provided by INRIA. You can see of good overview presentation by Sean Cribbs.

The Ideal Domain-Driven Design Aggregate Store?

At the 2014 DDD eXchange in NYC, a park bench discussion developed around storing Aggregates. The consensus among the DDD leadership was against Object-Relational Mapping (ORM) and the desire to come up with a better way to store Aggregates. There were comments about ORM in general being an antiquated approach. While some developers are still new to ORMs, the technology of shoehorning objects into relational databases is more than 20 years old. In 20+ years, why haven’t we found a better way to store Aggregates?

During the park bench discussion I promoted the idea of serializing Aggregates as JSON and storing them in that object notation in a document store. A JSON-based store would enable you to query the object’s fields. Central to the discussion, there would be no need to use an ORM. This would help to keep the Domain Model pure and save days or weeks of time generally spent fiddling with mapping details. Even more, your objects could be designed in just the way your Ubiquitous Language is developed, and without any object-relational impedance mismatch whatsoever. Anyone who has used ORM with DDD knows that the limitations of mapping options regularly impede your modeling efforts.

When thinking of a JSON-based store, no doubt your mind is immediately drawn to MongoDB. That’s just how MongoDB works. While true, MongoDB still falls short of filling the needs of DDD Aggregates in one very important way. In our park bench discussion I noted how MongoDB was close to what I wanted, but that you could not use MongoDB to both update an Aggregate’s state to one collection in the store and append one or more new Domain Events to a different collection in the same operation. In short, MongoDB doesn’t support ACID transactions. This is a big problem when you want to use Domain Events along with your Aggregates, but you don’t want to use Event Sourcing. That is, your Domain Events are an adjunct to your Aggregate state, not its left fold. Hopefully I don’t have to explain the problems that would occur if we successfully saved an Aggregate’s state to MongoDB, but failed to append a new Domain Event to the same storage. That would simply make the state of the application completely wrong, and no doubt would lead to inconsistencies in dependent parts of our own Domain Model and/or those in one or more other Bounded Contexts.

Rumor has it that MongoDB will at some future time support ACID transactions. In fact there is now a branch of MongoDB that supports ACID transactions. It’s the TokuMX project. Although you may personally feel comfortable using this product, it didn’t excite me. Frankly, it could be a huge challenge to get a given enterprise to support MongoDB in the first place, let alone trying to convince every stakeholder to support a branch of MongoDB that is delivered by a lesser known third party. It seems to me that the best chance to use MongoDB with ACID transactions in your project is when you can finally download it from MongoDB.org.

For me this meant looking elsewhere, and boy, I am glad I did. I believe that I have found the truly ideal DDD Aggregate store in PostgreSQL 9.4. Here are the main reasons why I think this is so:

  • PostgreSQL 9.4 supports both text-based JSON (json datatype) and binary JSON (jsonb datatype). The binary JSON type is a higher performing datatype than the text-based datatype.
  • You can query directly against the JSON, and create indexes on specific JSON object fields/attributes.
  • PostgreSQL is, of course, a relational database and supports ACID transactions.
  • PostgreSQL is a very mature open source product and comes with support tools such as the Postgres Enterprise Manager and the like.
  • You can get both community and commercial support for PostgreSQL, and you have a choice among multiple support vendors.
  • PostgreSQL is fast. I mean, PostgreSQL is seriously fast. In benchmarks around version 9.4, PostgreSQL can perform database writes at or near 14,000 transactions per second. You will be hard pressed to find many projects that will need to perform anywhere near that fast or faster. I don’t have the comparison benchmarks handy, but I believe that is significantly faster than MongoDB (without ACID transactions). In my experience most likely PostgreSQL 9.4 (and later versions) could address the performance needs of probably something like 97% of all enterprise projects globally. Of course your mileage may vary, but I regularly poll developers for performance numbers. The majority need (far) less than 1,000 transactions per second, and only a few require anywhere near 10,000 transactions per second.
  • Using PostgreSQL’s JSON support is just plain easy.

What I will do next is step through how easy it is to use PostgreSQL to create DDD Aggregate storage.

Developing a PostgreSQL JSON Repository

If you are familiar with my book, Implementing Domain-Driven Design, you recall the Core Domain named the Agile Project Management Context. In that Bounded Context we model a project management application for Scrum-based Products. A Product is an Entity that serves as the Root of the Aggregate:

public class Product extends Entity {

    private Set<ProductBacklogItem> backlogItems;
    private String description;
    private ProductDiscussion discussion;
    private String discussionInitiationId;
    private String name;
    private ProductId productId;
    private ProductOwnerId productOwnerId;
    private TenantId tenantId;
    ...
}

I am going to create a Repository to persist Product instances and find them again. Let’s first take a look at the basic means for persisting Product instances, and then we will look at querying for them. Here is the Repository declaration and the methods used to save and remove Product instances:

public class PostgreSQLJSONProductRepository
   extends AbstractPostgreSQLJSONRepository
   implements ProductRepository {
   ...
   @Override
   public ProductId nextIdentity() {
      return new ProductId(UUID.randomUUID().toString().toUpperCase());
   }
   ...
   @Override
   public void remove(Product aProduct) {
      this.deleteJSON(aProduct);
   }

   @Override
   public void removeAll(Collection<Product> aProductCollection) {
      this.deleteJSON(aProductCollection);
   }

   @Override
   public void save(Product aProduct) 
      this.saveAsJSON(aProduct);
   }

   @Override
   public void saveAll(Collection<Product> aProductCollection) {
      this.saveAsJSON(aProductCollection);
   }
   ...
}

That’s pretty simple. The bulk of the work is in the abstract base class, AbstractPostgreSQLJSONRepository. The only method that must be overridden and implemented by the concrete sub-class is the tableName(), which allows the abstract base class to know the name of the table in which the concrete type is stored:

public class PostgreSQLJSONProductRepository
      extends AbstractPostgreSQLJSONRepository
      implements ProductRepository {
   ...
   @Override
   protected String tableName() {
      return "tbl_products";
   }
   ...
}

Let’s take a look inside that base class:

public abstract class AbstractPostgreSQLJSONRepository {

   private ObjectSerializer serializer;
   ...
   protected AbstractPostgreSQLJSONRepository() {
      super();

      this.serializer = ObjectSerializer.instance();
   }

   protected void close(ResultSet aResultSet) {
      if (aResultSet != null) {
         try {
            aResultSet.close();
         } catch (Exception e) {
            // ignore
         }
      }
   }
	
   protected void close(Statement aStatement) {
      if (aStatement != null) {
         try {
            aStatement.close();
         } catch (Exception e) {
            // ignore
         }
      }
   }
	
   protected Connection connection() throws SQLException {
      Connection connection =
            PostgreSQLPooledConnectionProvider
                  .instance()
                  .connection();

      return connection;
   }

   protected void deleteJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.deleteJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot delete: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }

   protected void deleteJSON(
         Collection<? extends Identifiable<Long>> anAggregateRoots) {
		
         try {
            Connection connection = this.connection();

            for (Identifiable<Long> root : anAggregateRoots) {
               this.deleteJSON(connection, root);
            }

         } catch (Exception e) {
            throw new RuntimeException("Cannot delete: " + anAggregateRoots + " because: " + e.getMessage());
         }
   }
	
   protected <T extends Object> T deserialize(String aSerialization, final Class<T> aType) {
      return this.serializer.deserialize(aSerialization, aType);
   }

   ...

   protected String serialize(Object anAggregate) {
      return this.serializer.serialize(anAggregate);
   }

   protected abstract String tableName();

   protected void saveAsJSON(Identifiable<Long> anAggregateRoot) {
      if (anAggregateRoot.isUnidentified()) {
         this.insertAsJSON(anAggregateRoot);
      } else {
         this.updateAsJSON(anAggregateRoot);
      }
   }
	
   protected void saveAsJSON(Collection<? extends Identifiable<Long>> anAggregateRoots) {
      try {
         Connection connection = this.connection();

         for (Identifiable<Long> aggregateRoot : anAggregateRoots) {
            if (aggregateRoot.isUnidentified()) {
               this.insertAsJSON(connection, aggregateRoot);
            } else {
               this.updateAsJSON(connection, aggregateRoot);
            }
         }
	        
      } catch (Exception e) {
         throw new RuntimeException("Cannot save: " + anAggregateRoots + " because: " + e.getMessage());
      }
   }

   private void deleteJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws SQLException {
		
      PreparedStatement statement = null;
		
      try {
         statement = aConnection.prepareStatement(
               "delete from "
               + this.tableName()
               + " where id = ?");

         statement.setLong(1, anAggregateRoot.identity());
         statement.executeUpdate(); 

      } finally {
         this.close(statement);
      }
   }

   private void insertAsJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.insertAsJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot save: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }

   private void insertAsJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws Exception {

      PreparedStatement statement = null;

      try {
         String json = this.serialize(anAggregateRoot);
			
         PGobject jsonObject = new PGobject();
         jsonObject.setType("json");
         jsonObject.setValue(json);

         statement = aConnection.prepareStatement(
               "insert into "
               + this.tableName()
               + " (data) values (?)");

         statement.setObject(1, jsonObject); 
         statement.executeUpdate(); 

      } finally {
         this.close(statement);
      }
   }

   private void updateAsJSON(Identifiable<Long> anAggregateRoot) {
      try {
         Connection connection = this.connection();

         this.updateAsJSON(connection, anAggregateRoot);

      } catch (Exception e) {
         throw new RuntimeException("Cannot update: " + anAggregateRoot + " because: " + e.getMessage());
      }
   }
	
   private void updateAsJSON(
         Connection aConnection,
         Identifiable<Long> anAggregateRoot)
   throws SQLException {

      PreparedStatement statement = null;

      try {
         String json = this.serialize(anAggregateRoot);

         PGobject jsonObject = new PGobject();
         jsonObject.setType("json");
         jsonObject.setValue(json);

         statement = aConnection.prepareStatement(
               "update "
               + this.tableName()
               + " set data = ?"
               + " where id = ?");

         statement.setObject(1, jsonObject);
         statement.setLong(2, anAggregateRoot.identity());
         statement.executeUpdate();

      } finally {
         this.close(statement);
      }
   }
}

Here are the highlights from the abstract base class with regard to saving and removing Aggregates to and from the store:

  • We use an ObjectSerializer to serialize Aggregate instances to JSON, and to deserialize them from JSON back to their Aggregate instance state. This ObjectSerializer is the same one I used in my book, which is based on the Google Gson parser. The biggest reason I use this JSON parser is because it works be introspection and reflection on object fields rather than requiring objects to support the JavaBean specification (yuk!).
  • There are special methods that help close ResultSet and PreparedStatement instances.
  • Each Repository gets a JDBC Connection to the database using PostgreSQLPooledConnectionProvider. All of the operations are simple, lightweight JDBC operations. As indicated by its name, the PostgreSQLPooledConnectionProvider provides pooled Connections that are thread bound using ThreadStatic.
  • You can delete and insert one or many Aggregate instances in one operation. This supports remove(), removeAll(), save(), and saveAll() in the concrete sub-classes.
  • All communication via JDBC uses the PGobject type to carry the JSON payload to and from the database. The PGobject type in this code is “json” and the value is a JSON String object. You can easily switch the code to the more efficient “jsonb” type.

Note another detail. All Aggregate Root Entities are passed into the abstract base class as Identifiable instances. This enables the base class Repository to determine whether the instances have already been saved to the data store on prior operations, or if this is the first time. For first time persistence the Repository uses an INSERT operation. For subsequent saves after having read the Aggregate instances from the store the operation will be an UPDATE. The Entity type in the Agile Project Management code base implements the Identifiable interface:

public interface Identifiable<T> {
   public T identity();
   public void identity(T aValue);
   public boolean isIdentified();
   public boolean isUnidentified();
}

public abstract class Entity implements Identifiable<Long> {
    ...
    private Long surrogateIdentity;

    public Entity() {
        super();

        this.identity(0L);
    }
    ...
    @Override
    public Long identity() {
       return this.surrogateIdentity == null ? 0:this.surrogateIdentity;
    }

    @Override
    public void identity(Long aValue) {
       this.surrogateIdentity = aValue;
    }

    @Override
    public boolean isIdentified() {
       return identity() > 0;
    }

    @Override
    public boolean isUnidentified() {
       return identity() <= 0;
    }
    ...
}

Supporting this interface enables the various saveAsJSON() methods to interrogate each Aggregate instance for its surrogate identity. If the surrogate identity is not yet set, it knows that the Aggregate instance is new and must be inserted. If the surrogate identity is set, the Repository knows that it is a preexisting instance that must be updated to the data store. The surrogate identity is stored as the row’s primary key in the table.


Follow the Aggregate Rule of Thumb: Reference Other Aggregates By Identity Only

Following this rule is very important as it makes your Aggregate instance simple to serialize. If instead you use a graph of Aggregate instances, don’t expect fabulous things from the JSON serializer.


Speaking of database, here is a simple database SQL script used to create the database and tables used by the solution:

drop database if exists agilepm;
create database agilepm owner postgres;

create table tbl_events
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_publishednotificationtracker
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_timeconstrainedprocesstrackers
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_backlogitems
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_productowners
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_products
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_releases
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_sprints
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_teammembers
(
    id             bigserial primary key,
    data           json not null
);

create table tbl_teams
(
    id             bigserial primary key,
    data           json not null
);

As you can see, these are all very simple tables. The JSON is stored in the column named data. The bigserial column type is a bigint (8 bytes) that has a backing sequence. As you insert new rows into one of the tables, its sequence is used to auto-increment the primary key. As you can see, the tbl_events that holds each Domain Event published by the Bounded Context (see Chapter 8 of my book) has a primary key also. This serial bigint primary key serves as the unique notification identity for messaging notifications that are published inside and outside the Bounded Context.

Finally let’s take a look at how Aggregate instances stored as JSON inside the database are found. Note that we will be querying inside the data column of each database table. We use simple -> and ->> notation to navigate from data down into each JSON object. For example, here are the three finder methods found in the Repository for Products, the PostgreSQLJSONProductRepository:

public class PostgreSQLJSONProductRepository
      extends AbstractPostgreSQLJSONRepository
      implements ProductRepository {
   ...
   @Override
   public Collection<Product> allProductsOfTenant(TenantId aTenantId) {
      String filter = "data->'tenantId'->>'id' = ?";

      return this.findAll(Product.class, filter, "", aTenantId.id());
   }

   @Override
   public Product productOfDiscussionInitiationId(
         TenantId aTenantId,
         String aDiscussionInitiationId) {

      String filter = "data->'tenantId'->>'id' = ? and data->>'discussionInitiationId' = ?";

      return this.findExact(Product.class, filter, aTenantId.id(), aDiscussionInitiationId);
   }

   @Override
   public Product productOfId(TenantId aTenantId, ProductId aProductId) {
      String filter = "data->'tenantId'->>'id' = ? and data->'productId'->>'id' = ?";

      return this.findExact(Product.class, filter, aTenantId.id(), aProductId.id());
   }
   ...
}

From the data column we filter using a WHERE clause. The full SELECT statement is found in the abstract base class, which we will examine in a moment. To keep the finder interfaces very simple I only require the client Repository to provide the actual matching parts, such as seen in the code snippet above. There are several tokens in each filter. The data token refers to the data column in the given row. The other tokens such as ‘tenantId’, ‘id’, and ‘productId’ are the JSON field names. So, to match on the tenant identity in the JSON you use data->’tenantId’->>’id’ = ? as part of the WHERE clause. Note that -> is used to navigate above the actual target field, while ->> points to the final target field.

You can findAll() or findExact(), which find a Collection of a specific type or find a single instance of a specific type, respectively:

public abstract class AbstractPostgreSQLJSONRepository {
   ...
   protected <T extends Identifiable<Long>> List<T> findAll(
         Class<T> aType,
         String aFilterExpression,
         String anOrderBy,
         Object ... anArguments) {

      List<T> aggregates = new ArrayList<T>();
      PreparedStatement statement = null;
      ResultSet result = null;

      String query =
            "select id, data from "
            + this.tableName()
            + " where "
            + aFilterExpression
            + " "
            + anOrderBy;

      try {
         Connection connection = this.connection();

         statement = connection.prepareStatement(query);

         this.setStatementArguments(statement, anArguments);

         result = statement.executeQuery();

         while (result.next()) {
            Long identity = result.getLong(1);

            String serialized = result.getObject(2).toString();
            	
            T aggregate = this.deserialize(serialized, aType);
            	
            aggregate.identity(identity);

            aggregates.add(aggregate);
         }

      } catch (Exception e) {
         throw new RuntimeException("Cannot find: " + query + " because: " + e.getMessage());
      } finally {
         this.close(statement);
         this.close(result);
      }

      return aggregates;
   }
	
   protected <T extends Identifiable<Long>> T findExact(
         Class<T> aType,
         String aFilterExpression,
         Object ... anArguments) {

      T aggregate = null;

      List<T> aggregates = this.findAll(aType, aFilterExpression, "", anArguments);

      if (!aggregates.isEmpty()) {
         aggregate = aggregates.get(0);
      }

      return aggregate;
   }
   ...
   private void setStatementArguments(
         PreparedStatement aStatement,
         Object[] anArguments)
   throws SQLException {

      for (int idx = 0; idx < anArguments.length; ++idx) {
         Object argument = anArguments[idx];
         Class<?> argumentType = argument.getClass();

         if (argumentType == String.class) {
            aStatement.setString(idx+1, (String) argument);
         } else if (argumentType == Integer.class) {
            aStatement.setInt(idx+1, (Integer) argument);
         } else if (argumentType == Long.class) {
            aStatement.setLong(idx+1, (Long) argument);
         } else if (argumentType == Boolean.class) {
            aStatement.setBoolean(idx+1, (Boolean) argument);
         } else if (argumentType == Date.class) {
            java.sql.Date sqlDate = new java.sql.Date(((Date) argument).getTime());
            aStatement.setDate(idx+1, sqlDate);
         } else if (argumentType == Double.class) {
            aStatement.setDouble(idx+1, (Double) argument);
         } else if (argumentType == Float.class) {
            aStatement.setFloat(idx+1, (Float) argument);
         }
      }
   }
   ...
}

The backbone of the finders is implemented in findAll(), which findExact() reuses. Note that when the ResultSet is obtained we iterate over each entry. Using findAll() you can both filter and order the outcome by a specific column or JSON field.

We obtain both the surrogate identity and the JSON serialization payload. Once the JSON is used to deserialize to the Aggregate instance, we set the surrogate identity as the identity of the Identifiable. This prepares the Aggregate instance for updating should the client decide to modify the instance and call save() on the Product Repository.

Well, that’s pretty much it. Every concrete Repository implemented using the AbstractPostgreSQLJSONRepository is very simple and straightforward. I intend to push the implementation to its Github repository as soon as possible. That should give you everything you need to implement this in your own project.

An Approach to Composing Aggregate Boundaries

For complete coverage of this topic, you should see my book: Domain-Driven Design Distilled

Modeling Aggregates with DDD and Entity Framework

For everyone who has read my book and/or Effective Aggregate Design, but have been left wondering how to implement Aggregates with Domain-Driven Design (DDD) on the .NET platform using C# and Entity Framework, this post is for you.

[NOTE: As expected, this article has within hours of posting received some criticism for the approach used to O-R mapping with Entity Framework. Actually the article received much more praise than criticism, but… I want to just point out that I am purposely not attempting to win any guru award in Entity Framework mapping. If you browse through this post too quickly some of the key words of wisdom and my intent may be lost on your speed reading. I am purposely avoiding some of the expert guidance that is typically given with a view to deep understanding of Entity Framework mappings. In fact, you may not realize the purpose of the article unless you begin reading with the assumed attitude that “I hate O-R mapping.” The O-R mapping tooling is actually something like 20+ years old, and it is time that we come up with more practical solutions to storing objects as objects. In the meantime we should just do as little O-R mapping as we can get away with. So, thanks for your words of advice, but I have done everything below with precise intent.]

Definition of Aggregate

To start off, let’s recap the basic definition of DDD Aggregate. First and foremost the Aggregate pattern is about transactional consistency. At the end of a committed database transaction, a single Aggregate should be completely up to date. That means that any business rules regarding data consistency must be met and the persistence store should hold that consistent state, leaving the Aggregate correct and ready to use by the next use case. Figure 1 illustrates two such consistency boundaries, with two different Aggregates.

Aggregates

Figure 1. Two Aggregates, which represent two transactional consistency boundaries.

The problem that many have with designing Aggregates is that they don’t consider the true business constraints that require data to be transactionally consistent and instead design Aggregates in large clusters as shown in Figure 2. Designing Aggregates in this way is a big mistake if you expect them (1) to be used by many thousands of users, (2) to perform well, and (3) to scale to the demands of the Internet.

LargeCluster

Figure 2. A poorly designed Aggregate that is not conceived on according to true business consistency constraints.

Using an example from my book, a set of well-designed Aggregates are shown in Figure 3. These are based on true business rules that require specific data to be up-to-date at the end of a successful database transaction. These follow the rules of Aggregate, including designing small Aggregates.

FourSmallAggregates

Figure 3. Some well-designed Aggregates that adhere to true consistency rules.

Still, the question arises, if BacklogItem and Product have some data dependencies, how do we update both of them. This points to the another rule of Aggregate design, to use eventual consistency as shown in Figure 4. Of course, there’s a bit more involved when you consider the overall architecture, but the foregoing points out the high-level composition guidance of Aggregate design.

EventualConsistency

Figure 4. When two or more Aggregates have at least some dependencies on updates, use eventual consistency.

Now with this brief refresher on the basics of Aggregate design, let’s see how we might map the Product to a database using Entity Framework.

KISS with Entity Framework

So, we have four prominent Aggregates in our Scrum project management application: Product, BacklogItem, Release, and Sprint. We need to persist the state of these four small Aggregates and we want to use Entity Framework to do so. Here’s a possible surprise for you. I am not going to recommend that you need to become an Entity Framework guru. Nope, just the opposite in fact. I am going to suggest that you allow the Entity Framework development team to be the gurus, and you just focus on your specific application. After all, your Core Domain is where you want to put your creative energies, not in becoming an expert in Entity Framework.

What I am recommending is that you allow Entity Framework to take control of doing what it does best and we just stay out of its way. Entity Framework has a certain way of mapping entities into the database, and that’s just how it works. As soon as you try to step outside the basics and go to some extremes of esoteric mapping techniques in ways that Entity Framework was not meant to be used, you are going to experience a lot of pain. Still, we can get quite a bit of mileage out of Entity Framework in the midst of DDD and be quite happy with the way it all works out. To do so we are going to use just a few basic mapping techniques. If you follow my KISS guidance you can mostly ignore your Entity Framework documentation and how-to books. Just allow Entity Framework to map entities and get back to what will make a difference in this competitive world: your market-distinguishing application.

We are going to implement the Product Aggregate using two approaches. One approach uses a Separated Interface with an implementation class, and the other uses a domain object backed by a state object. The whole point of these examples is to stay as far out of Entity Framework’s way as possible.

Using a Separated Interface and Implementation Class

For the first example I create a Separated Interface that is implemented by a concrete domain object. Figure 5 shows you the basic intention of this approach.

EntityFramework1

Figure 5. The Separated Interface named IProduct is implemented by a concrete domain object. Clients directly use only IProduct.

It is pretty typical when programming with C# and .NET to name your interfaces with an “I” prefix, so we will use IProduct:

interface IProduct
{
  ICollection<IBacklogItem> AllBacklogItems();
  IProductBacklogItem BacklogItem(BacklogItemId backlogItemId);
  string Description { get; }
  string Name { get; }
  IBacklogItem PlanBacklogItem(BacklogItemId newBacklogItemId, string summary,
      string story, string category, BacklogItemType type, StoryPoints storyPoints);
  void PlannedProductBacklogItem(IBacklogItem backlogItem);
  ...
  ProductId ProductId { get; }
  ProductOwnerId ProductOwnerId { get; }
  void ReorderFrom(BacklogItemId id, int ordering);
  TenantId TenantId { get; }
}

With this interface we can create a concrete implementation class. Let’s call it Product:

public class Product : IProduct
{
  [Key]
  public string ProductKey { get; set; }
  ...
}

The point of the concrete class Product is to implement the business interface declared by IProduct and to also provide the accessors that are needed by Entity Framework to map the object into and out of the database. Note the ProductKey property. This is technically the kind of primary key that Entity Framework wants to work with. However, it is different from the ProductId, which when combined with the TenantId is the business identity. Therefore, internally the ProductKey must be set to a composite of TenantId as a string and ProductId as a string:

ProductKey = TenantId.Id + ":" + ProductId.Id;

I think you get the idea. We create an interface that we want our client to see and we hide the implementation details inside the implementing class. We make the implementation match up to really basic Entity Framework mappings. We purposely try to keep our special mappings, as with ProductKey, to a minimum. This helps keep the DbContext very simple by registering the implementation classes:

public class AgilePMContext : DbContext
{
  public DbSet<Product> Products { get; set; }
  public DbSet<ProductBacklogItem> ProductBacklogItems { get; set; }
  public DbSet<BacklogItem> BacklogItems { get; set; }
  public DbSet<Task> Tasks { get; set; }
  ...
}

Rather than fully fleshing out the details of this approach, there is enough detail already to make some judgments. I’d like to discuss the fundamental flaws that I see in it:

  1. The Ubiquitous Language is not really reinforced by using interfaces such as IProduct, IBacklogItem, etc. IProduct and IBacklogItem are not in our Ubiquitous Language, but Product and BacklogItem are. Thus, the client facing names should be Product, BacklogItem, and the like. We could accomplish this simply by naming the interfaces Product, BacklogItem, Release, and Sprint, but that would mean we would have to come up with sensible names for the implementation classes. Let’s just pause there and move on to the second and related issue.
  2. There is really no good reason to create a Separated Interface. It would be very unlikely that we would ever create two or more implementations of IProduct or any of the other interfaces. The best reason we have for creating a Separated Interface is when there could be or are multiple implementations, and is just not going to happen in this Core Domain.

Based on these two points alone I would personally choose to abandon this approach before going any further with it. When using Domain-Driven Design the most important and overarching principle is the adhere to the Ubiquitous Language, and from the get-go this approach is driving us away from business terminology rather than toward it.

Domain Object Backed By a State Object

The second approach uses a domain object backed by state objects. As shown in Figure 6, the domain object defines and implements the domain-driven model using the Ubiquitous Language, and the state objects hold the state of the Aggregate.

EntityFramework2

Figure 6. The domain object that models the Aggregate behavior is backed by a state object that holds the model’s state.

By keeping state objects separate from the domain-driven implementation objects, it enables very simple mappings. We let Entity Framework to do what it knows how to do by default to map entities to and from the database. Consider Product, which is backed by the ProductState object. We have two Product constructors; a public business constructor for normal clients and a second internal constructor that is used only by internal implementation components:

public class Product
{
  public Product(
      TenantId tenantId,
      ProductId productId,
      ProductOwnerId productOwnerId,
      string name,
      string description)
  {
    State = new ProductState();
    State.ProductKey = tenantId.Id + ":" + productId.Id;
    State.ProductOwnerId = productOwnerId;
    State.Name = name;
    State.Description = description;
    State.BacklogItems = new List<ProductBacklogItem>();
  }

  internal Product(ProductState state)
  {
    State = state;
  }
  ...
}

When the business constructor is invoked we create a new ProductState object and initialize it. The state object has a simple string-based identity:

public class ProductState
{
  [Key]
  public string ProductKey { get; set; }

  public ProductOwnerId ProductOwnerId { get; set; }

  public string Name { get; set; }

  public string Description { get; set; }

  public List<ProductBacklogItemState> BacklogItems { get; set; }
  ...
}

The ProductKey is actually encoded with two properties, the TenantId as a string and the ProductId as a string, with the two separated by a ‘:’ character. Including the TenantId in the ProductKey ensures that all data stored in the database is segregated by tenant. We must still support client requests for TenantId and ProductId from the Product:

public class Product
{
  ...
  public ProductId ProductId { get { return new ProductId(State.DecodeProductId()); } }
  ...
  public TenantId TenantId { get { return new TenantId(State.DecodeTenantId()); } }
  ...
}

The ProductState object must support both DecodeProductId() and DecodeTenantId() methods. We could also choose to design the state object to redundantly hold whole identities separate of the ProductKey:

public class ProductState
{
  [Key]
  public string ProductKey { get; set; }

  public ProductId ProductId { get; set; }

  public ProductOwnerId ProductOwnerId { get; set; }

  public string Name { get; set; }

  public string Description { get; set; }

  public List<ProductBacklogItemState> BacklogItems { get; set; }

  public TenantId TenantId { get; set; }
  ...
}

This could be well worth the slight memory overhead if converting to identities had a heavy performance footprint. All of the identity types, including ProductOwnerId, are Value Objects and are flattened and mapped into the same database row that ProductState occupies:

[ComplexType]
public class ProductOwnerId : Identity
{
  public ProductOwnerId()
      : base()
  {
  }

  public ProductOwnerId(string id)
      : base(id)
  {
  }
}

The [ComplexType] attribute marks the Value Object as a complex type, which is different from an entity. Complex types are non-scalar values that do not have keys and cannot be managed apart from their containing entity, or the complex type within which they are nested. Marking a Value Object with the Entity Framework [ComplexType] causes the data of the Value Object to be saved to the same database row as the entity. In this case, ProductOwnerId would be saved to the same database row as the ProductState entity.

Here are the base types for all Identity types of Value Objects:

public abstract class Identity : IEquatable<Identity>, IIdentity
{
  public Identity()
  {
    this.Id = Guid.NewGuid().ToString();
  }

  public Identity(string id)
  {
    this.Id = id;
  }

  public string Id { get; set; }

  public bool Equals(Identity id)
  {
    if (object.ReferenceEquals(this, id)) return true;
    if (object.ReferenceEquals(null, id)) return false;
    return this.Id.Equals(id.Id);
  }

  public override bool Equals(object anotherObject)
  {
    return Equals(anotherObject as Identity);
  }

  public override int GetHashCode()
  {
    return (this.GetType().GetHashCode() * 907) + this.Id.GetHashCode();
  }

  public override string ToString()
  {
    return this.GetType().Name + " [Id=" + Id + "]";
  }
}

public interface IIdentity
{
  string Id { get; set; }
}

So, the ProductState object stands on its own when it comes to persisting the state of the Product. However, the ProductState also holds another collection of entities; that is, the List of ProductBacklogItemState:

public class ProductState
{
  [Key]
  public string ProductKey { get; set; }
  ...
  public List<ProductBacklogItemState> BacklogItems { get; set; }
  ...
}

This is all well and good because we keep the database mappings really simple. Yet, how do we get a ProductBacklogItemState object, or the entire List collection for that matter, into a format that we can allow clients to consume? The ProductBacklogItemState is an internal implementation details—just a data holder. This points to the need for a few simple converters, which are used by the Product Aggregate root:

public class Product
{
  ...
  public ICollection AllBacklogItems()
  {
    List all =
        State.BacklogItems.ConvertAll( 
            new Converter<ProductBacklogItemState, ProductBacklogItem>(
                ProductBacklogItemState.ToProductBacklogItem));

    return new ReadOnlyCollection(all);
  }

  public ProductBacklogItem BacklogItem(BacklogItemId backlogItemId)
  {
    ProductBacklogItemState state =
        State.BacklogItems.FirstOrDefault(
            x => x.BacklogItemKey.Equals(backlogItemId.Id));

    return new ProductBacklogItem(state);
  }
  ...
}

Here we convert a collection of ProductBacklogItemState instances to a collection of ProductBacklogItem instances. And when the client requests just one ProductBacklogItem, we convert to one from a single ProductBacklogItemState with the matching identity. The ProductBacklogItemState object must only support a few simple conversion methods:

public class ProductBacklogItemState
{
  [Key]
  public string BacklogItemKey { get; set; }
  ...
  public ProductBacklogItem ToProductBacklogItem()
  {
    return new ProductBacklogItem(this);
  }

  public static ProductBacklogItem ToProductBacklogItem(
        ProductBacklogItemState state)
  {
    return new ProductBacklogItem(state);
  }
  ...
}

Should the client ask repeatedly for a collection of ProductBacklogItem instances the Product could cache the collection after the first time it is generated.

In the end our goal is to stay out of the way of Entity Framework and make it super simple to map state objects in and out of the database. I think when you consider the DbContext for this solution you will conclude that we have a really simple approach:

public class AgilePMContext : DbContext
{
  public DbSet<ProductState> Products { get; set; }
  public DbSet<ProductBacklogItemState> ProductBacklogItems { get; set; }
  public DbSet<BacklogItemState> BacklogItems { get; set; }
  public DbSet<TaskState> Tasks { get; set; }
  public DbSet<ReleaseState> Releases { get; set; }
  public DbSet<ScheduledBacklogItemState> ScheduledBacklogItems { get; set; }
  public DbSet<SprintState> Sprints { get; set; }
  public DbSet<CommittedBacklogItemState> CommittedBacklogItems { get; set; }
  ...
}

Creating and using a ProductRepository is easy as well:

public interface ProductRepository
{
  void Add(Product product);

  Product ProductOfId(TenantId tenantId, ProductId productId);
}

public class EFProductRepository : ProductRepository
{
  private AgilePMContext context;

  public EFProductRepository(AgilePMContext context)
  {
    this.context = context;
  }

  public void Add(Product product)
  {
    try
    {
      context.Products.Add(product.State);
    }
    catch (Exception e)
    {
      Console.WriteLine("Add() Unexpected: " + e);
    }
  }

  public Product ProductOfId(TenantId tenantId, ProductId productId)
  {
    string key = tenantId.Id + ":" + productId.Id;
    var state = default(ProductState);

    try
    {
      state = (from p in context.Products
               where p.ProductKey == key
               select p).FirstOrDefault();
    }
    catch (Exception e)
    {
      Console.WriteLine("ProductOfId() Unexpected: " + e);
    }

    if (EqualityComparer<ProductState>.Default.Equals(state, default(ProductState)))
    {
      return null;
    }
    else
    {
      return new Product(state);
    }
  }
}

// Using the repository
using (var context = new AgilePMContext())
{
  ProductRepository productRepository = new EFProductRepository(context);

  var product =
        new Product(
              new ProductId(),
              new ProductOwnerId(),
              "Test",
              "A test product.");

  productRepository.Add(product);

  context.SaveChanges();
  ...
  var foundProduct = productRepository.ProductOfId(product.ProductId);
}

Taking this approach will help us to stay focused on what really counts the most, our Core Domain and its Ubiquitous Language.

Effective Aggregate Design

This is a three-part series about using Domain-Driven Design (DDD) to implement Aggregates. Clustering Entities and Value Objects into an Aggregate with a carefully crafted consistency boundary may at first seem like quick work, but among all DDD tactical guidance, this pattern is one of the least well understood. This essay is the basis for Chapter 10 of my book, Implementing Domain-Driven Design.

The documents are available for download as three PDFs and are licensed under the Creative Commons Attribution-NoDerivs 3.0 Unported License.

Original English Edition

Effective Aggregate Design: Part 1
Effective Aggregate Design: Part 2
Effective Aggregate Design: Part 3

French Translation

Conception Efficace des Aggregates 1 ere Partie

 

DDD with Scala and Akka Revisited

There were a number of issues with the code I wrote in the post Using Scala and Akka with Domain-Driven Design. I am addressing those problems in this post and cleaning up the code. The good thing about revisiting this topic is that you can learn from the problems found in the previous post, and how to fix them.

Share Nothing

A major problem in the previous post is that I shared part of an Actor’s internal state with the outside.Recall that one of the basic rules of Actor Model is to share nothing. Oops.

The AggregateCache used a helper named AggregateCacheWorker. The existence of this class in itself is not a problem, especially when it is acting in behalf of the AggregateCache. The problem is that the DomainModel can send a ProvideWorker message to the AggregateCache to request a reference to its internal AggregateCacheWorker. That’s just wrong. The DomainModel should not be able to obtain a reference to any part of the state of AggregateCache.

So how can this problem be corrected? The main clue is to ask: Why did the DomainModel imagine it needed to use the AggregateCacheWorker? The reason is that the DomainModel needed to delegate Aggregate Actor creation to the AggregateCacheWorker because the AggregateCacheWorker holds the ActorContext needed to create a child of the AggregateCache. Yet, this is clearly not the way we should attempt to create new Aggregate Actors.

The solution to the problem actually already exists in the AggregateCache itself. Recall that when a message is sent from a client to an Aggregate Actor, the AggregateCache will check to see if that specific child Actor is currently in memory or not. If the Actor is not currently in memory, the AggregateCache creates the child Aggregate Actor dynamically and then dispatches the message being sent to it by the client. Thus, we can just leverage this AggregateCache behavior to lazily create an Aggregate Actor when the first message is sent to it.

But wait. Isn’t it a problem for the Aggregate Actor not to exist at all until a message is sent to it? Well, not if you consider an Actor’s state is mutated by it’s handling various messages. Therefore, the Actor really has an empty initial state until it receives it’s first command message. Here’s how it will work:

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("OrderProcessing")

  model.registerAggregateType("co.vaughnvernon.orderprocessing.domain.model.Order")

  val order = model.aggregateOf("co.vaughnvernon.orderprocessing.domain.model.Order", "123")

  order ! InitializeOrder(249.95)

  ...

Did you notice some fundamental changes to the client code compared to the previous example? First of all, now when registering an Aggregate type we use the fully-qualified class name of the Actor. Then when the Aggregate is created using aggregateOf(), we pass in only the type and the globally unique id. We no longer pass in the Props, because there will purposely be no initial state in the Actor until it receives its first message, which you can see is sent as InitializeOrder just following its creation by aggregateOf().

Improved Solution

So, allow me to reintroduce the Scala classes from the top down, starting with the new DomainModel and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, id: String): AggregateRef = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      aggregateType.cacheActor ! RegisterAggregateId(id)
      AggregateRef(id, aggregateType.cacheActor)
    } else {
      throw new IllegalStateException("DomainModel type registry does not have a $typeName")
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      aggregateTypeRegistry(typeName) = AggregateType(actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

Note that the DomainModel no longer attempts to use the AggregateCacheWorker. In fact, there is no longer such a worker class. Instead aggregateOf() now sends a message to the AggregateCache under which the Aggregate Actor is to exist:

aggregateType.cacheActor ! RegisterAggregateId(id)

This leads to the new implementation of the AggregateCache:

class AggregateCache(typeName: String) extends Actor {
  val aggregateClass: Class[Actor] = Class.forName(typeName).asInstanceOf[Class[Actor]]
  val aggregateIds = scala.collection.mutable.Set[String]()

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        if (!aggregateIds.contains(message.id)) {
          throw new IllegalStateException(s"No aggregate of type $typeName and id ${message.id}")
        } else {
          context.actorOf(new Props(aggregateClass), message.id)
        }
      }
      aggregate.tell(message.actualMessage, message.sender)

    case register: RegisterAggregateId =>
      this.aggregateIds.add(register.id)
  }
}

The AggregateCache no longer holds a Map of type names to Props. Instead, it now contains a Set of unique identities for each Aggregate that has been registered. You can see that each identity is registered when a RegisterAggregateId message is sent from to DomainModel and received by the AggregateCache.

Even so, what would happen if the RegisterAggregateId message is not received by the AggregateCache until after the first message is sent from the client to the yet-to-be-created Aggregate Actor? Actually this is impossible because of a simple rule of Actor Model: When a message is sent to an Actor and the Actor has a default FIFO mailbox, that message is guaranteed to be received by the Actor before any subsequently sent messages. Thus, when the DomainModel sends RegisterAggregateId to the AggregateCache, there is no way that a subsequent CacheMessage sent to the Aggregate Actor in question will be received by the AggregateCache before the RegisterAggregateId is received.

Now back to the state of the AggregateCache. It also has an aggregateClass instance value, which is created from the typeName. This is possible because the typeName now must be the fully-qualified class name of the Aggregate Actor type. The aggregateClass is passed as the only Props argument to the actorOf() function of the ActorContext. This allows the Actor to be dynamically created using the specific type for which each specific AggregateCache exists.

The support classes are only slightly different from the previous example:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(cacheActor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class RegisterAggregateId(id: String)

Finally, here are the changes to the Order Aggregate:

class Order extends Actor {
  var amount: Double = _

  def receive = {
    case init: InitializeOrder =>
      println(s"Initializing Order with $init")
      this.amount = init.amount
    case processOrder: ProcessOrder =>
      println(s"Processing Order is $processOrder")
      DomainModelPrototype.completedStep()
  }
}

case class InitializeOrder(amount: Double)
case class ProcessOrder

I think this addresses all the the issues that were apparent from the original post. Hopefully it has reenforced the basic rule of Actor Model: Share Nothing.

There are still several other pieces of the DDD with Scala and Akka puzzle to snap into place. I’ll be introducing those over the next few weeks. Well, I can only promise that it will be ASAP.

Using Scala and Akka with Domain-Driven Design

If you’ve been following my theme for a while I’ve spent a considerable amount of effort promoting Scala and Akka for use when Implementing Domain-Driven Design. Few seem to share my vision of a completely explicit use of Actors as Aggregates, with direct message sending between clients and the Aggregates (in which case the “client” may be, for example, the User Interface or another Aggregate).

NOTE: There is a follow up to this post where I address the problems found herein.

More recently there have been numerous suggestions to place various wrappers around the domain model’s Aggregates. This smells of Application Layer and its Application Services, or at least like a named cached/region such is used in Coherence or GemFire. That’s absolutely not what I want. Sure, those mechanisms could exist behind the scenes, but not between a client and the Aggregate instance (an Actor) that it wants to communicate with. The latter makes me really unhappy. The main point of my emphasis on the use of Actor Model in the first place was to get rid of all the extraneous architecture layers that cloud our vision of what the core of the application—its domain model—is really doing.

Rather than abandon Akka as a means to employ Actor Model with DDD, what I did over the past few hours was prototype a few useful abstractions as really simple set of Scala classes that could be used to implement my vision. So, here you go.

It seems appropriate to me that the basic interface to the desired functionality should be a DomainModel. Here’s a simple class and companion object:

object DomainModel {
  def apply(name: String): DomainModel = {
    new DomainModel(name)
  }
}

class DomainModel(name: String) {
  val aggregateTypeRegistry = scala.collection.mutable.Map[String, AggregateType]()
  val system = ActorSystem(name)

  def aggregateOf(typeName: String, props: Props, id: String) = {
    if (aggregateTypeRegistry.contains(typeName)) {
      val aggregateType = aggregateTypeRegistry(typeName)
      val worker = aggregateType.worker
      val actorRef = worker.aggregateOf(props, id)
      val cacheActor = aggregateType.actor
      AggregateRef(id, cacheActor)
    } else {
      AggregateRef(null, null)
    }
  }

  def registerAggregateType(typeName: String): Unit = {
    if (!aggregateTypeRegistry.contains(typeName)) {
      val actorRef = system.actorOf(Props(new AggregateCache(typeName)), typeName)
      implicit val timeout = Timeout(5 seconds)
      val future = actorRef ? ProvideWorker
      val worker = Await.result(future, timeout.duration).asInstanceOf[AggregateCacheWorker]
      aggregateTypeRegistry(typeName) = AggregateType(worker, actorRef)
    }
  }

  def shutdown() = {
    system.shutdown()
  }
}

After your DomainModel is initialized, the idea is to allow for the registration of various Aggregate types using registerAggregateType(). After you have registered any number of types that are appropriate for your Bounded Context, you can start creating instances of them using aggregateOf(). Here you pass the type name of the Aggregate that you want to create, as well as the Props of the Akka Actor, and the id of the Aggregate. The id is both the name of the Actor and the globally unique identity of the Aggregate instance being created. Finally, if your application needs to be shutdown you must call the DomainModel shutdown() function.

Now, what is the reason for registering Aggregate types? This serves a few different purposes. First of all, there is an underlying Actor that implements a special node-specific cache. This cache Actor will serve as the parent of all Aggregate Actors within the registered type. All messages that will be sent to the Actor that implements a specific Aggregate instance must pass through the cache Actor. Here’s the class that serves as the parent of all Aggregate instances within a specified type, or named cache:

class AggregateCache(typeName: String) extends Actor {
  val worker = new AggregateCacheWorker(context)

  def receive = {
    case message: CacheMessage =>
      val aggregate = context.child(message.id).getOrElse {
        context.actorOf(worker.propsFor(message.id), message.id)
      }
      aggregate.tell(message.actualMessage, message.sender)

    case ProvideWorker =>
      sender ! worker
  }
}

As you can see, the main functionality of the AggregateCache is to look up child Aggregate Actors and dispatch messages to them. If the look up fails it means that the Aggregate Actor instance no longer in memory. This implies that Aggregates can be transient; that is, they may exist in memory for some period of time, and then after some degree of inactivity they can be closed and removed from memory. However, if a message is sent to the Aggregate Actor by a client, the client needs to be assured that the Aggregate will receive the message no matter what. This is where the getOrElse expression comes into play. The Aggregate Actor will be dynamically reloaded, and then the AggregateCache can dispatch the message to it.

The AggregateCache Actor uses an internal worker object:

class AggregateCacheWorker(context: ActorContext) {
  val aggregateInfo = scala.collection.mutable.Map[String, Props]()

  def aggregateOf(props: Props, id: String): ActorRef = {
    if (!aggregateInfo.contains(id)) {
      aggregateInfo(id) = props
      context.actorOf(props, id)
    } else {
      throw new IllegalStateException(s"Aggregate with id $id already exists")
    }
  }

  def propsFor(id: String): Props = {
    if (aggregateInfo.contains(id)) {
      aggregateInfo(id)
    } else {
      throw new IllegalStateException(s"No Props for aggregate of id $id")
    }
  }
}

The AggregateCacheWorker handles support work that is needed by the DomainModel and AggregateCache. This includes creating new Aggregate instances by way of aggregateOf(), and also providing the Props instance, via propsFor(), for each Aggregate Actor that may need to be dynamically reloaded.

Finally, there are a few simple support classes:

case class AggregateRef(id: String, cache: ActorRef) {
  def tell(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }

  def !(message: Any)(implicit sender: ActorRef = null): Unit = {
    cache ! CacheMessage(id, message, sender)
  }
}

case class AggregateType(worker: AggregateCacheWorker, actor: ActorRef)

case class CacheMessage(id: String, actualMessage: Any, sender: ActorRef)

case class ProvideWorker

The AggregateRef is a really important abstraction. Rather than returning an ActorRef when an Aggregate Actor is created, the DomainModel instead returns an AggregateRef. As you can see from its two functions, the AggregateRef knows how to send messages to the underlying Aggregate Actor. However, it’s not a “straight shot,” so to speak, as it is with ActorRef. Instead, all messages sent to Aggregates via AggregateRef’s tell() are first sent to the AggregateCache. This allows for look up and any necessary dynamic loading of the Aggregate Actors into the cache. Messages are sent to AggregateCache as a CacheMessage instance.

So, how does it work? Here’s a runner for the prototype:

class Order(id: String, amount: Double) extends Actor {
  def receive = {
    case p: ProcessOrder =>
      println(s"Processing Order is $p")
      DomainModelPrototype.completedStep()
    case a: Any =>
      println(s"message is $a")
      DomainModelPrototype.completedStep()
  }
}

case class ProcessOrder

object DomainModelPrototype extends CompletableApp(1) {

  val model = DomainModel("prototype")

  model.registerAggregateType("Order")

  val order = model.aggregateOf("Order", Props(new Order("123", 249.95)), "123")

  order ! ProcessOrder()

  awaitCompletion()

  model.shutdown()

  println("DomainModelPrototype: is completed.")
}

This way the Order Aggregate Actor can, as if, receive messages directly from a client. This is not actually a direct message send in the sense that ActorRef manages. Even so, ActorRef’s message sends are also not as “direct” as you might like to think. So by means of DomainModel and AggregateRef—the only abstractions of necessity exposed to the client—we get a smooth and seamless interface to Aggregate Actors. Even better, these few abstractions address the way Akka needs to work, but covers over the particulars.

NOTE: If you think the above code is a little hacky, you would be correct. I made some tradeoffs just to express my vision in Scala and Akka without getting bogged down in laborious details. The little bit of code should be revisited for some clean up. Yet, it still needs a bit more details for handling distributed caches, etc. Right now it’s just a prototype. What’s good about the revisit to the code is that corrections end up being a good learning example.

Thanks to the Akka Team, including Roland Kuhn and Patrik Nordwall, for ideas about making Akka happy. Their advice was to use some sort of intermediary to dynamically look up child Aggregate Actors, which I implemented as AggregateCache. Yet, it is my use of AggregateRef that allows the intermediary to be removed from the view of clients.

Anyway, I’ve been beating this drum for a while. I feel good that I finally had a few hours to hammer out a prototype solution for this specific set of problems. There is still a large body of work that I have accomplished around DDD with Scala and Akka that I will be presenting ASAP.

Naming “Shadow” Concepts Across Bounded Contexts

I was asked why the concept named Tenant is used in all three Bounded Contexts for the example code for my book, Implementing Domain-Driven Design. Is it because a Tenant is the exact same thing in all three Contexts? Actually this is a good example of when concepts in two or more Bounded Contexts have the same name but have different meanings and uses. Let’s consider how they differ.

In the Identity and Access Context a Tenant is an Aggregate that has a life cycle. It can be disabled and can be used to invite new registrations, and even carry out new registrations. If that Tenant is disabled, no users registered under it will be able to authenticate. (See the discussion in Chapter 5.) Here a Tenant also has a globally (SaaS wide) unique identity.

In the Collaboration Context there is a concept named Tenant, while in the Agile Project Management Context there is a TenantId. In both cases—Tenant and TenantId—these are modeled as Value Objects and used only for identity. One important reason for these identities is to ensure that the only users within a specific tenancy (an organization that rents/hires use of the SaaS services and stores its related data) can use objects within that tenant.

Why do you think the two teams that respectively work on the Collaboration Context and the Agile Project Management Context didn’t use different names? After all, the teams need not be tied to the original name, Tenant, just because they hold the identity of the Tenant originating in the Identity and Access Context.

True, the teams could have chosen to use the name Subscriber or Company in the two consuming Contexts. They certainly could have done so since the Ubiquitous Language of each team is formed by the team. But what would be the merit in that when Tenant is clearly understood throughout the entire SaaS organization (SaaSOvation)? If new team members are added over the life of each project, it’s just one more detail to have to explain why the Tenant over there is represented as a Subscriber over here, and why our team thinks it is important to distinguish the concept as a Subscriber in our own Context.

In practice, using Subscriber or Company or some other name seems to have little or no justification. In Collaboration, for example, the team doesn’t care anything at all about subscriptions. Further, Company isn’t fitting either, because perhaps the Collaboration subscriber is a school and not a commercial organization. In the end, Tenant works just fine everywhere, and actually in this case everyone who works at SaaSOvation will know exactly what it means.

Still, this specific example in no way indicates that concepts in any given consuming Bounded Context that “shadow” a concept in another Bounded Context must reuse the name of the originating concept. The concept name, the identity, and the limited number of attributes consumed by the foreign Context must be carefully chosen by the team that is responsible for how the local concept is modeled.

First Ever DDD eXchange in NYC with Discount

Join me and Skills Matter in NYC on September 17, 2013, for the DDD eXchange. The event begins at 9:00 am.

I will be there with Eric Evans for the first ever DDD eXchange in DUMBO, New York City. After 5 years hosting the leading conference for DDD enthusiasts in London, Skills Matter is now bringing the event to the NYC community.

Eric will be joined by leading DDD and software architecture experts including Greg Young (creator of CQRS), me (the author of Implementing Domain-Driven Design), and Steve Bohlen (DDD NYC User Group founder).

Sign up here and enter the discount code SkillsMatter_Community on the booking form to receive a 20% discount.

I will also be teaching my 3-Day Implementing Domain-Driven Design Workshop in conjunction with Skills Matter on September 18-20, 2013. Register before the early bird special ends.

I look forward to meeting you in NYC!

It’s Not Just About Authorization, It’s About the Ubiquitous Language

Recently someone asked questions about the IDDD sample code and authorization. Basically the question was, why can’t I authorize the user in the model to start a new Forum Discussion? The developer would pass in a “session” object to the Forum startDiscussion() and have the Forum double dispatch to the “session” to check for authorization:

public class Forum ... {
    ...
    public Discussion startDiscussion(Session aSession, String anAuthorId, ...) {
        aSession.assertAuthorizedDiscussionAuthor(this.tenant(), anAuthorId);
        ...
    }
    ...
}

Take a look at IDDD pages 75-79, and specifically at the refactored code on pages 78-79. It’s not that it is absolutely wrong to in essence authorize the user in the model. Rather, it’s that the authorization is done for free when obtaining the necessary Author instance by way of the CollaboratorService.

Further, it’s a matter that the Author is an essential part of the Ubiquitous Language, while the “session” is not. If anything, rather than pass in a “session” object (or worse yet, SecuritySession), pass in CollaboratorService instead, and double dispatch on the CollaboratorService to get the Author from the Forum’s own Tenant and parameter String anAuthorId:

public class Forum ... {
    ...
     public Discussion startDiscussion(
        CollaboratorService aCollaboratorService,
        String anAuthorId,
        ...) {
        Author author = aCollaboratorService.authorFrom(this.tenant(), anAuthorId);
        if (author == null) {
            throw new IllegalArgumentException("Not a valid author.");
        }
        ...
    }
    ...
}

In my sample code I show the Forum Application Service obtaining the Author via CollaboratorService. This is not wrong because normally the API would serve as the natural client of the model. When using the Hexagonal (Ports and Adapters) architecture, requests from disparate user agent types would all be adapted to use a single API call. This is based on use case, not on user agent type. But you could pass in CollaboratorService to the Forum if you prefer.