Implementing Event Sourced Entities in Java

Event Sourced Entities persist their state with ACID semantics new tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​rather than persisting the current state, they persist all of the events that led to the current state. Akka Serverless stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

TODO: add an illustration

To load an Entity, Akka Serverless reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Akka Serverless to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Akka Serverless distributes Entities across every node in a stateful service deployment—​at any given time, each Entity will live on exactly one node. If a command for an Entity arrives to a node not hosting that Entity, the command is forwarded by the proxy to the node that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one node, that node can handle messages for each Entity sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Event Sourced Entities persist changes as events and snapshots. Akka Serverless needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf types.

While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement an Event Sourced Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command and event handlers.

  3. Creating and initializing the Entity.

The sections on this page walk through these steps using a shopping cart service as an example.

Defining the proto files

Our Event Sourced Entity example is a shopping cart service.

The following shoppingcart_domain.proto file defines our "Shopping" Event Sourced Entity. The entity manages line items of a cart and stores events ItemAdded and ItemRemoved to represent changes to the cart. Real-world entities store much more data — often structured data —  they represent an Entity in the domain-driven design sense of the term.

Java
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// These are the messages that get persisted - the events, plus the current
// state (Cart) for snapshots.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

import "akkaserverless/annotations.proto"; (2)

option java_outer_classname = "ShoppingCartDomain"; (3)

// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (4)
  name: "ShoppingCart" (5)
  entity_type: "shopping-cart" (6)
  state: "Cart" (7)
  events: ["ItemAdded", "ItemRemoved"] (8)
};

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
}

// The checked out event.
message CheckedOut {
  int64 checked_out_timestamp = 1;
}

// The shopping cart state.
message Cart {
  repeated LineItem items = 1;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.domain.
2 Import the Akka Serverless protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartDomain.
4 The protobuf option (akkaserverless.file).event_sourced_entity is specific to code-generation as provided by the Akka Serverless Maven plugin.
5 name denotes the base name for the Event Sourced Entity, the code-generation will create initial sources ShoppingCart, ShoppingCartTest and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
6 entity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t.
7 state points to the protobuf message representing the entity’s state which is kept by Akka Serverless. It is stored as snapshots.
8 events points to the protobuf message representing the entity’s events, which are stored by Akka Serverless.
Scala
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// These are the messages that get persisted - the events, plus the current
// state (Cart) for snapshots.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

import "akkaserverless/annotations.proto"; (2)

// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (3)
  name: "ShoppingCart" (4)
  entity_type: "shopping-cart" (5)
  state: "Cart" (6)
  events: ["ItemAdded", "ItemRemoved"] (7)
};

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
}

// The shopping cart state.
message Cart {
  repeated LineItem items = 1;
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.shoppingcart.domain.
2 Import the Akka Serverless protobuf annotations or options.
3 The protobuf option (akkaserverless.file).event_sourced_entity is specific to code-generation as provided by the Akka Serverless sbt plugin.
4 name denotes the base name for the Event Sourced Entity, the code-generation will create initial sources ShoppingCart, ShoppingCartTest and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
5 entity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t.
6 state points to the protobuf message representing the entity’s state which is kept by Akka Serverless. It is stored as snapshots.
7 events points to the protobuf message representing the entity’s events, which are stored by Akka Serverless.

The shoppingcart_api.proto file defines the commands we can send to the shopping cart service to manipulate or access the cart’s state. They make up the service API:

Java
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the shopping cart entity.

syntax = "proto3";

package com.example.shoppingcart; (1)

import "akkaserverless/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "ShoppingCartApi"; (3)

message AddLineItem { (4)
  string cart_id = 1 [(akkaserverless.field).entity_key = true]; (5)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
  string product_id = 2;
}

message GetShoppingCart {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
}

message Cart { (6)
  repeated LineItem items = 1;
}

service ShoppingCartService { (7)
  option (akkaserverless.service) = { (8)
    type: SERVICE_TYPE_ENTITY
    component: "com.example.shoppingcart.domain.ShoppingCart"
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/cart/{cart_id}/items/{product_id}/remove";
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
          get: "/carts/{cart_id}/items"
          response_body: "items"
      } };
  }
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.
2 Import the Akka Serverless protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin and points to the protobuf definition ShoppingCart we’ve seen above (in the com.example.shoppingcart.domain package).
Scala
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the shopping cart entity.

syntax = "proto3";

package com.example.shoppingcart; (1)

import "akkaserverless/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

message AddLineItem { (3)
  string cart_id = 1 [(akkaserverless.field).entity_key = true]; (4)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
  string product_id = 2;
}

message GetShoppingCart {
  string cart_id = 1 [(akkaserverless.field).entity_key = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
}

message Cart { (5)
  repeated LineItem items = 1;
}

service ShoppingCartService { (6)
  option (akkaserverless.service) = { (7)
    type: SERVICE_TYPE_ENTITY
    component: "com.example.shoppingcart.domain.ShoppingCart"
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/cart/{cart_id}/items/{product_id}/remove";
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
          get: "/carts/{cart_id}/items"
          response_body: "items"
      } };
  }
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.shoppingcart.
2 Import the Akka Serverless protobuf annotations or options.
3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless sbt plugin and points to the protobuf definition ShoppingCart we’ve seen above (in the com.example.shoppingcart.domain package).

Implementing behavior

An Event Sourced Entity implementation is a class where you define how each command is handled. The class ShoppingCart gets generated for us based on the shoppingcart_api.proto and shoppingcart_domain.proto definitions. Once the file exists, it is not overwritten, so you can freely add logic to it. ShoppingCart extends the generated class AbstractShoppingCart which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractShoppingCart contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the ShoppingCart class and you have to implement the methods required by AbstractShoppingCart.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public class ShoppingCart extends AbstractShoppingCart { (1)
  @SuppressWarnings("unused")
  private final String entityId;

  public ShoppingCart(EventSourcedEntityContext context) { this.entityId = context.entityId(); }

  @Override
  public ShoppingCartDomain.Cart emptyState() { (2)
    return ShoppingCartDomain.Cart.getDefaultInstance();
  }
1 Extends the generated AbstractShoppingCart, which extends EventSourcedEntity new tab.
2 Defines the initial, empty, state that is used before any updates.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
class ShoppingCart(context: EventSourcedEntityContext) extends AbstractShoppingCart { (1)

  @nowarn("msg=unused")
  private val entityId = context.entityId

  override def emptyState: Cart = Cart.defaultInstance (2)
1 Extends the generated AbstractShoppingCart, which extends EventSourcedEntity new tab.
2 Defines the initial, empty, state that is used before any updates.

We need to implement all methods our Event Sourced Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect, which describes the next processing actions, such as emitting events and sending a reply.

When adding or changing the rpc definitions, including name, parameter, and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractShoppingCart). This means that the Java compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the AddItem service call uses the request message AddLineItem. It returns an Effect to emit an event and then sends a reply once the event is stored successfully. The state is updated by the event handler.

The only way for a command handler to modify the Entity’s state is by emitting an event. Any modifications made directly to the state (or instance variables) from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> addItem(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartApi.AddLineItem command) {
  if (command.getQuantity() <= 0) { (1)
    return effects().error("Quantity for item " + command.getProductId() + " must be greater than zero.");
  }

  ShoppingCartDomain.ItemAdded event = (2)
      ShoppingCartDomain.ItemAdded.newBuilder()
          .setItem(
              ShoppingCartDomain.LineItem.newBuilder()
                  .setProductId(command.getProductId())
                  .setName(command.getName())
                  .setQuantity(command.getQuantity())
                  .build())
          .build();

  return effects()
          .emitEvent(event) (3)
          .thenReply(newState -> Empty.getDefaultInstance()); (4)
}
1 The validation ensures the quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current incoming AddLineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects().emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored, otherwise there will be an error reply.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def addItem(currentState: Cart, addLineItem: shoppingcart.AddLineItem): EventSourcedEntity.Effect[Empty] =
  if (addLineItem.quantity <= 0)
    effects.error(s"Quantity for item ${addLineItem.productId} must be greater than zero.") (1)
  else {
    val event = ItemAdded( (2)
      Some(LineItem(
        productId = addLineItem.productId,
        name = addLineItem.name,
        quantity = addLineItem.quantity))
    )
    effects.emitEvent(event) (3)
      .thenReply(_ => Empty.defaultInstance) (4)
  }
1 The validation ensures the quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects.error.
2 From the current incoming AddLineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects.emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored, otherwise there will be an error reply.

The new state is created from the event and the previous state in the event handler. Event handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public ShoppingCartDomain.Cart itemAdded(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartDomain.ItemAdded itemAdded) {
  ShoppingCartDomain.LineItem item = itemAdded.getItem();
  ShoppingCartDomain.LineItem lineItem = updateItem(item, currentState);
  List<ShoppingCartDomain.LineItem> lineItems =
      removeItemByProductId(currentState, item.getProductId());
  lineItems.add(lineItem);
  lineItems.sort(Comparator.comparing(ShoppingCartDomain.LineItem::getProductId));
  return ShoppingCartDomain.Cart.newBuilder().addAllItems(lineItems).build();
}

private ShoppingCartDomain.LineItem updateItem(
    ShoppingCartDomain.LineItem item, ShoppingCartDomain.Cart cart) {
  return findItemByProductId(cart, item.getProductId())
      .map(li -> li.toBuilder().setQuantity(li.getQuantity() + item.getQuantity()).build())
      .orElse(item);
}

private Optional<ShoppingCartDomain.LineItem> findItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  Predicate<ShoppingCartDomain.LineItem> lineItemExists =
      lineItem -> lineItem.getProductId().equals(productId);
  return cart.getItemsList().stream().filter(lineItemExists).findFirst();
}

private List<ShoppingCartDomain.LineItem> removeItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  return cart.getItemsList().stream()
      .filter(lineItem -> !lineItem.getProductId().equals(productId))
      .collect(Collectors.toList());
}
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def itemAdded(currentState: Cart, itemAdded: ItemAdded): Cart = {
  val cart = currentState.items.map(lineItem => lineItem.productId -> lineItem).toMap
  val item = cart.get(itemAdded.getItem.productId) match {
    case Some(existing) => existing.copy(quantity = existing.quantity + itemAdded.getItem.quantity)
    case None => itemAdded.getItem
  }
  val updatedCart = cart + (item.productId -> item)
  currentState.withItems(updatedCart.values.toSeq)
}

Note that you have to define the events in the (akkaserverless.file).event_sourced_entity:

Java
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (4)
  name: "ShoppingCart" (5)
  entity_type: "shopping-cart" (6)
  state: "Cart" (7)
  events: ["ItemAdded", "ItemRemoved"] (8)
};
Scala
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// Describes how this domain relates to an event sourced entity
option (akkaserverless.file).event_sourced_entity = { (3)
  name: "ShoppingCart" (4)
  entity_type: "shopping-cart" (5)
  state: "Cart" (6)
  events: ["ItemAdded", "ItemRemoved"] (7)
};

Retrieving state

The following example shows the implementation of the GetCart command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it:

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<ShoppingCartApi.Cart> getCart(
    ShoppingCartDomain.Cart currentState, (1)
    ShoppingCartApi.GetShoppingCart command) {
  List<ShoppingCartApi.LineItem> apiItems =
      currentState.getItemsList().stream()
          .map(this::convert)
          .sorted(Comparator.comparing(ShoppingCartApi.LineItem::getProductId))
          .collect(Collectors.toList());
  ShoppingCartApi.Cart apiCart =
          ShoppingCartApi.Cart.newBuilder().addAllItems(apiItems).build(); (2)
  return effects().reply(apiCart);
}

private ShoppingCartApi.LineItem convert(ShoppingCartDomain.LineItem item) {
  return ShoppingCartApi.LineItem.newBuilder()
          .setProductId(item.getProductId())
          .setName(item.getName())
          .setQuantity(item.getQuantity())
          .build();
}
1 The current state is passed to the method.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects().reply.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def getCart(
    currentState: Cart, (1)
    getShoppingCart: shoppingcart.GetShoppingCart): EventSourcedEntity.Effect[shoppingcart.Cart] = {
  val apiItems = currentState.items.map(convertToApi).sortBy(_.productId)
  val apiCart = shoppingcart.Cart(apiItems) (2)
  effects.reply(apiCart)
}

private def convertToApi(item: LineItem): shoppingcart.LineItem =
  shoppingcart.LineItem(
    productId = item.productId,
    name = item.name
  )
1 The current state is passed to the method.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects.reply.

Registering the Entity

To make Akka Serverless aware of the Event Sourced Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated AkkaServerlessFactory.withComponents method from the Main class.

Java
src/main/java/com/example/shoppingcart/Main.java
/* This code was generated by Akka Serverless tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */
//tag::RegisterEventSourcedEntity[]
package com.example.shoppingcart;

import com.akkaserverless.javasdk.AkkaServerless;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.shoppingcart.domain.ShoppingCart;
import com.example.shoppingcart.view.ShoppingCartViewServiceImpl;

public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static AkkaServerless createAkkaServerless() {
    // The AkkaServerlessFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new AkkaServerless()` instance.
    return AkkaServerlessFactory.withComponents(
        ShoppingCart::new,
        ShoppingCartViewServiceImpl::new
    );
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Akka Serverless service");
    createAkkaServerless().start();
  }
}
//end::RegisterEventSourcedEntity[]
Scala
src/main/scala/com/example/shoppingcart/Main.scala
package com.example.shoppingcart

import com.akkaserverless.scalasdk.AkkaServerless
import com.example.shoppingcart.domain.ShoppingCart
import org.slf4j.LoggerFactory

// This class was initially generated based on the .proto definition by Akka Serverless tooling.
//
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

object Main {

  private val log = LoggerFactory.getLogger("com.example.shoppingcart.Main")

  def createAkkaServerless(): AkkaServerless = {
    // The AkkaServerlessFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `AkkaServerless()` instance.
    AkkaServerlessFactory.withComponents(
      new ShoppingCart(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Akka Serverless service")
    createAkkaServerless().start()
  }
}

By default, the generated constructor has an EventSourcedEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the ShoppingCart class you will see a compilation error here, and you have to adjust the factory function that is passed to AkkaServerlessFactory.withComponents.

When more components are added the AkkaServerlessFactory is regenerated and you have to adjust the registration from the Main class.

Snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Akka Serverless can initiate them from a snapshot.

Snapshots are stored and handled automatically by Akka Serverless without any specific code required. Snapshots are stored after a configured number of events:

src/main/resources/application.conf
akkaserverless.event-sourced-entity.snapshot-every = 100

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received.

Testing the Entity

The following snippet shows how the ShoppingCartTestKit is used to test the ShoppingCart implementation. AkkaServerless provides two main APIs for unit tests, the ShoppingCartTestKit and the EventSourcedResult. The former gives us the overall state of the entity. Its state and all the events produced by all the calls to the Entity. While the latter only holds the effects produced for each individual call to the Entity.

Java
src/test/java/com/example/shoppingcart/domain/ShoppingCartTest.java
package com.example.shoppingcart.domain;

import com.akkaserverless.javasdk.testkit.EventSourcedResult;
import com.example.shoppingcart.ShoppingCartApi;
import com.google.protobuf.Empty;
import org.junit.Test;

import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

public class ShoppingCartTest {

    @Test
    public void addItemTest() {

        ShoppingCartTestKit testKit = ShoppingCartTestKit.of(ShoppingCart::new); (1)

        ShoppingCartApi.AddLineItem apples = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        EventSourcedResult<Empty> addingApplesResult = testKit.addItem(apples); (2)

        ShoppingCartApi.AddLineItem bananas = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        testKit.addItem(bananas); (3)

        assertEquals(1, addingApplesResult.getAllEvents().size()); (4)
        assertEquals(2, testKit.getAllEvents().size()); (5)

        ShoppingCartDomain.ItemAdded addedApples = addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class); (6)
        assertEquals("apples", addedApples.getItem().getName());
        assertThrows(NoSuchElementException.class, () ->  addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class)); (7)
        assertEquals(Empty.getDefaultInstance(), addingApplesResult.getReply()); (8)

        ShoppingCartDomain.LineItem expectedApples = ShoppingCartDomain.LineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        ShoppingCartDomain.LineItem expectedBananas = ShoppingCartDomain.LineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        ShoppingCartDomain.Cart expectedState = ShoppingCartDomain.Cart.newBuilder()
                .addItems(expectedApples)
                .addItems(expectedBananas)
                .build();
        assertEquals(expectedState, testKit.getState()); (9)
    }
}
1 creating the TestKit passing the constructor of the Entity.
2 calling the method addItem from the Entity in the ShoppingCartTestKit.
3 calling the method addItem from the Entity in the ShoppingCartTestKit.
4 checking the EventSourcedResult of the first call to addItem.
5 checking the EventSourcedResult of all the calls to addItem.
6 retrieving the first event generated from the first call to addItem.
7 retrieving the second event generated from the first call to addItem. There is no such event as our implementation only generates one event when addItem it’s called.
8 retrieving the response from the call to addItem.
9 retrieving the state of the entity after all the calls to addItem.
Scala
src/test/scala/com/example/shoppingcart/domain/ShoppingCartSpec.scala
package com.example.shoppingcart.domain

import com.akkaserverless.scalasdk.eventsourcedentity.EventSourcedEntity
import com.akkaserverless.scalasdk.testkit.EventSourcedResult
import com.example.shoppingcart
import com.example.shoppingcart.AddLineItem
import com.google.protobuf.empty.Empty
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ShoppingCartSpec extends AnyWordSpec with Matchers {
  "The ShoppingCart" should {

    "correctly process commands of type AddItem" in {
      val testKit = ShoppingCartTestKit(new ShoppingCart(_)) (1)
      val apples = AddLineItem(productId = "idA", name = "apples", quantity = 1)
      val addingApplesResult = testKit.addItem(apples) (2)

      val bananas = AddLineItem(productId = "idB", name = "bananas", quantity = 2)
      testKit.addItem(bananas) (3)

      addingApplesResult.events should have size 1 (4)
      testKit.allEvents should have size 2 (5)

      val addedApples = addingApplesResult.nextEvent[ItemAdded] (6)
      addedApples.getItem.name shouldBe "apples"
      intercept[NoSuchElementException] { (7)
        addingApplesResult.nextEvent[ItemAdded]
      }
      addingApplesResult.reply shouldBe Empty.defaultInstance (8)

      val expectedState = Cart(Seq(
        LineItem(productId = "idA", name = "apples", quantity = 1),
        LineItem(productId = "idB", name = "bananas", quantity = 2)
      ))
      testKit.currentState shouldBe expectedState (9)
    }
  }
}
1 creating the TestKit passing the constructor of the Entity.
2 calling the method addItem from the Entity in the ShoppingCartTestKit.
3 calling the method addItem from the Entity in the ShoppingCartTestKit.
4 checking the EventSourcedResult of the first call to addItem.
5 checking the EventSourcedResult of all the calls to addItem.
6 retrieving the first event generated from the first call to addItem.
7 retrieving the second event generated from the first call to addItem. There is no such event as our implementation only generates one event when addItem it’s called.
8 retrieving the response from the call to addItem.
9 retrieving the state of the entity after all the calls to addItem.

By default the integration and unit test are both invoked by sbt test. To only run unit tests run sbt -DonlyUnitTest test, or sbt -DonlyUnitTest=true test, or set up that value to true in the sbt session by set onlyUnitTest := true and then run test

EventSourcedResult

Java

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert of handling the command, such as:

  • getReply() - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • getAllEvents() - all the events emitted by handling the command.

  • getState() - the state of the entity after applying any events the command handler emitted.

  • getNextEventOfType(ExpectedEvent.class) - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

Scala

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert the result of handling the command, such as:

  • reply - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • events - all the events emitted by handling the command.

  • state - the state of the entity after applying any events the command handler emitted.

  • nextEvent[ExpectedEvent] - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

ShoppingCartTestKit

Java

This class is generated by AkkaServerless when the project is compiled and located in target/generated-test-sources/akkaserveless/java/com/example/shoppingcart/domain/. It provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • getState() - the current state of the entity, it is updated on each method call emitting events.

  • getAllEvents() - all events emitted since the creation of the testkit instance.

Scala

This class is generated by AkkaServerless when the project is compiled and located in target/generated-test-sources/akkaserveless/scala/com/example/shoppingcart/domain/. It provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • currentState - the current state of the entity, it is updated on each method call emitting events.

  • allEvents - all events emitted since the creation of the testkit instance.