Implementing Value Entities in Java or Scala

Value Entities persist state on every change and Akka Serverless needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf types.

While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement a Value Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command handlers.

  3. Creating and initializing the Entity.

The sections on this page walk through these steps using a counter service as an example.

Defining the proto files

Our Value Entity example starts with the "Counter" service as included in the project template.

The following counter_domain.proto file defines our "Counter" Value Entity. The entity stores an integer value as represented in the message CounterState. Real-world entities store much more data — often structured data —  they represent an Entity in the domain-driven design sense of the term.

Java
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3";

package com.example.domain; (1)

import "akkaserverless/annotations.proto"; (2)

option java_outer_classname = "CounterDomain"; (3)

option (akkaserverless.file).value_entity = { (4)
    name: "Counter" (5)
    entity_type: "counter" (6)
    state: "CounterState" (7)
};

message CounterState { (8)
  int32 value = 1;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.domain.
2 Import the Akka Serverless protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class CounterDomain.
4 The protobuf option (akkaserverless.file).value_entity is specific to code-generation as provided by the Akka Serverless Maven plugin.
5 name denotes the base name for the Value Entity, the code-generation will create initial sources Counter, CounterTest and CounterIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
6 entity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t.
7 state points to the protobuf message representing the Value Entity’s state which is kept by Akka Serverless.
8 The CounterState protobuf message is what Akka Serverless stores for this entity.
Scala
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3";

package com.example.domain; (1)

import "akkaserverless/annotations.proto"; (2)

option (akkaserverless.file).value_entity = { (3)
    name: "Counter" (4)
    entity_type: "counter" (5)
    state: "CounterState" (6)
};

message CounterState { (7)
  int32 value = 1;
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.domain.
2 Import the Akka Serverless protobuf annotations, or options.
3 The protobuf option (akkaserverless.file).value_entity is specific to code-generation as provided by the Akka Serverless sbt plugin.
4 name denotes the base name for the Value Entity, the code-generation will create initial sources Counter, CounterTest and CounterIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
5 entity_type is a unique identifier of the "state storage". The entity name may be changed even after data has been created, the entity_type can’t.
6 state points to the protobuf message representing the Value Entity’s state which is kept by Akka Serverless.
7 The CounterState protobuf message is what Akka Serverless stores for this entity.

The counter_api.proto file defines the commands we can send to the Counter service to manipulate or access the Counter’s state. They make up the service API:

Java
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity.
syntax = "proto3";

package com.example; (1)

import "akkaserverless/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "CounterApi"; (3)

message IncreaseValue { (4)
  string counter_id = 1 [(akkaserverless.field).entity_key = true]; (5)
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message GetCounter {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message CurrentCounter { (6)
  int32 value = 1;
}

service CounterService { (7)
  option (akkaserverless.service) = { (8)
    type : SERVICE_TYPE_ENTITY
    component : "com.example.domain.Counter"
  };

  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
}
1 Any classes generated from this protobuf file will be in the Java package com.example.
2 Import the Akka Serverless protobuf annotations, or options.
3 Let the messages declared in this protobuf file be inner classes to the class CounterApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless Maven plugin and points to the protobuf definition Counter we’ve seen above (in the com.example.domain package).
Scala
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity.
syntax = "proto3";

package com.example; (1)

import "akkaserverless/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

message IncreaseValue { (3)
  string counter_id = 1 [(akkaserverless.field).entity_key = true]; (4)
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message GetCounter {
  string counter_id = 1 [(akkaserverless.field).entity_key = true];
}

message CurrentCounter { (5)
  int32 value = 1;
}

service CounterService { (6)
  option (akkaserverless.service) = { (7)
    type : SERVICE_TYPE_ENTITY
    component : "com.example.domain.Counter"
  };

  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.
2 Import the Akka Serverless protobuf annotations or options.
3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (akkaserverless.field).entity_key option.
5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (akkaserverless.service) is specific to code-generation as provided by the Akka Serverless sbt plugin and points to the protobuf definition Counter we’ve seen above (in the com.example.domain package).

Implementing behavior

A Value Entity implementation is a class where you define how each command is handled. The class Counter gets generated for us based on the counter_api.proto and counter_domain.proto definitions. Once the generated file exists, it is not overwritten, so you can freely add logic to it. Counter extends the generated class AbstractCounter which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractCounter contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the Counter class and you have to implement the methods required by AbstractCounter.

Java
src/main/java/com/example/domain/Counter.java
/**
 * A Counter represented as a value entity.
 */
public class Counter extends AbstractCounter { (1)

    private final String entityId;

    public Counter(ValueEntityContext context) {
        this.entityId = context.entityId();
    }

    @Override
    public CounterDomain.CounterState emptyState() { (2)
      return CounterDomain.CounterState.getDefaultInstance();
    }
1 Extends the generated AbstractCounter, which extends ValueEntity new tab.
2 Defines the initial, empty, state that is used before any updates.
Scala
src/main/java/com/example/domain/Counter.scala
/** A value entity. */
class Counter(context: ValueEntityContext) extends AbstractCounter { (1)

  override def emptyState: CounterState = CounterState() (2)
1 Extends the generated AbstractCounter, which extends ValueEntity new tab.
2 Defines the initial, empty, state that is used before any updates.

We need to implement all methods our Value Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the Counter class as methods that override abstract methods from AbstractCounter. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect, which describes the next processing actions, such as updating state and sending a reply.

When adding or changing the rpc definitions, including name, parameter, and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractCounter). This means that the compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the Increase service call uses the value from the request message IncreaseValue. It returns an Effect to update the entity state and send a reply.

For value entities, modify the state and then trigger a save of that state in the returned Effect. The Java SDK has an effects().updateState(newState) method for this purpose. If you change the state but do not call updateState in the returned Effect, that state change is lost.
Java
/src/main/java/com/example/domain/Counter.java
@Override
public Effect<Empty> increase(
    CounterDomain.CounterState currentState, CounterApi.IncreaseValue command) {
  if (command.getValue() < 0) { (1)
    return effects().error("Increase requires a positive value. It was [" +
        command.getValue() + "].");
  }
  CounterDomain.CounterState newState =  (2)
          currentState.toBuilder().setValue(currentState.getValue() +
              command.getValue()).build();
  return effects()
          .updateState(newState) (3)
          .thenReply(Empty.getDefaultInstance());  (4)
}
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current state we create a new state with the increased value.
3 We store the new state by returning an Effect with effects().updateState.
4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply.
Scala
/src/main/java/com/example/domain/Counter.scala
override def increase(currentState: CounterState, command: example.IncreaseValue): ValueEntity.Effect[Empty] =
  if (command.value < 0) (1)
    effects.error(s"Increase requires a positive value. It was [${command.value}].")
  else {
    val newState = currentState.copy(value = currentState.value + command.value) (2)
    effects
      .updateState(newState) (3)
      .thenReply(Empty.defaultInstance) (4)
  }
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect with effects.error.
2 From the current state we create a new state with the increased value.
3 We store the new state by returning an Effect with effects.updateState.
4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply.

Retrieving state

The following example shows the implementation of the GetCurrentCounter command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it:

Java
src/main/java/com/example/domain/Counter.java
@Override
public Effect<CounterApi.CurrentCounter> getCurrentCounter(
        CounterDomain.CounterState currentState, (1)
        CounterApi.GetCounter command) {
    CounterApi.CurrentCounter current =
            CounterApi.CurrentCounter.newBuilder()
                .setValue(currentState.getValue()).build(); (2)
    return effects().reply(current);
}
1 The current state is passed to the method
2 We use its value to create the CurrentCounter value that is sent as a reply by returning an Effect with effects().reply.
Scala
src/main/scala/com/example/domain/Counter.scala
override def getCurrentCounter(
    currentState: CounterState, (1)
    command: example.GetCounter): ValueEntity.Effect[example.CurrentCounter] =
  effects.reply(CurrentCounter(currentState.value)) (2)
1 The current state is passed to the method
2 We use its value to create the CurrentCounter value that is sent as a reply by returning an Effect with effects.reply.

Registering the Entity

To make Akka Serverless aware of the Value Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated AkkaServerlessFactory.withComponents method from the Main class.

Java
/src/main/java/com/example/Main.java
public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);
  public static AkkaServerless createAkkaServerless() {
    // The AkkaServerlessFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new AkkaServerless()` instance.
    return AkkaServerlessFactory.withComponents(
            Counter::new);
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Akka Serverless service");
    createAkkaServerless().start();
  }
}
Scala
/src/main/scala/com/example/Main.scala
object Main {

  private val log = LoggerFactory.getLogger("com.example.Main")

  def createAkkaServerless(): AkkaServerless = {
    // The AkkaServerlessFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `AkkaServerless()` instance.
    AkkaServerlessFactory.withComponents(
      new Counter(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Akka Serverless service")
    createAkkaServerless().start()
  }
}

By default, the generated constructor has a ValueEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the Counter class you will see a compilation error here, and you have to adjust the factory function that is passed to AkkaServerlessFactory.withComponents.

When more components are added the AkkaServerlessFactory is regenerated, and you have to adjust the registration from the Main class.

Testing the Entity

There are two ways to test a Value Entity: * unit tests, which run the Entity class in the same JVM as the test code itself with the help of a test kit * integration tests, with the service deployed in a docker container running the entire service and the test interacting over gRPC with it.

Each way has its benefits, unit tests are faster and provide more immediate feedback about success or failure but can only test a single entity at a time and in isolation. Integration tests, on the other hand, are more realistic and allow many entities to interact with other components inside and outside of the service. For example, actual publishing to a pub/sub topic.

Unit tests

To unit test the Entity a test kit class to use is generated as well as an example unit test class to start from. Test cases use the test kit to execute commands in the entity, get a ValueEntityResult back and assert the effects that the command led to, both the reply itself and the update to the state of the Entity.

Java
/src/test/java/com/example/domain/CounterTest.java
import com.akkaserverless.javasdk.testkit.ValueEntityResult;
import com.example.CounterApi;
import com.google.protobuf.Empty;
import org.junit.Test;

import static org.junit.Assert.*;

public class CounterTest {

  @Test
  public void increaseCounterTest() {
    CounterTestKit testKit = CounterTestKit.of(Counter::new);

    CounterApi.IncreaseValue increaseValueCommand = CounterApi.IncreaseValue.newBuilder()
        .setValue(1)
        .build();
    ValueEntityResult<Empty> result1 = testKit.increase(increaseValueCommand);
    assertEquals(Empty.getDefaultInstance(), result1.getReply());
    assertEquals(1, testKit.getState().getValue());

    // one more time
    ValueEntityResult<Empty> result2 = testKit.increase(increaseValueCommand);
    assertEquals(Empty.getDefaultInstance(), result2.getReply());
    assertEquals(2, testKit.getState().getValue());
  }

The unit tests can be run from maven using mvn test or if you prefer from inside your IDE the same way you usually run tests.

Scala
/src/test/scala/com/example/domain/CounterSpec.scala
class CounterSpec extends AnyWordSpec with Matchers {

  "Counter" must {

    "handle command Increase" in {
      val testKit = CounterTestKit(new Counter(_))

      val result1 = testKit.increase(IncreaseValue(value = 1))
      result1.reply shouldBe Empty.defaultInstance

      // one more time
      val result2 = testKit.increase(IncreaseValue(value = 1))
      result2.reply shouldBe Empty.defaultInstance
      testKit.currentState().value shouldBe 2
    }

By default, the integration and unit tests are both invoked by sbt test. To only run unit tests run sbt -DonlyUnitTest test, or sbt -DonlyUnitTest=true test, or set up that value to true in the sbt session by set onlyUnitTest := true and then run test

Integration tests

An example integration test class to start from is also generated for you. It uses an AkkaServerlessTestKitResource AkkaServerlessTestKit to start docker containers and interacts with the entity with an actual gRPC client.

Java
/src/it/java/com/example/domain/CounterTest.java
public class CounterIntegrationTest {

  /**
   * The test kit starts both the service container and the Akka Serverless proxy.
   */
  @ClassRule
  public static final AkkaServerlessTestKitResource testKit =
          new AkkaServerlessTestKitResource(Main.createAkkaServerless());

  /**
   * Use the generated gRPC client to call the service through the Akka Serverless proxy.
   */
  private final CounterService client;

  public CounterIntegrationTest() {
      client = testKit.getGrpcClient(CounterService.class);
  }

  @Test
  public void increaseOnNonExistingEntity() throws Exception {
      String entityId = "new-id";
      client.increase(CounterApi.IncreaseValue.newBuilder().setCounterId(entityId).setValue(42).build())
               .toCompletableFuture().get(5, SECONDS);
      CounterApi.CurrentCounter reply = client.getCurrentCounter(CounterApi.GetCounter.newBuilder().setCounterId(entityId).build())
              .toCompletableFuture().get(5, SECONDS);
      assertThat(reply.getValue(), is(42));
  }

The integration tests are in a special profile it of the project and can be run using mvn verify -Pit.

Scala
/src/test/scala/com/example/CounterServiceIntegrationSpec.scala
package com.example

import com.akkaserverless.scalasdk.testkit.AkkaServerlessTestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Millis
import org.scalatest.time.Seconds
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpec

// This class was initially generated based on the .proto definition by Akka Serverless tooling.
//
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

class CounterServiceIntegrationSpec
    extends AnyWordSpec
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  implicit private val patience: PatienceConfig =
    PatienceConfig(Span(5, Seconds), Span(500, Millis))

  private val testKit = AkkaServerlessTestKit(Main.createAkkaServerless()).start()
  import testKit.executionContext

  private val client = testKit.getGrpcClient(classOf[CounterService])

  "CounterService" must {

    "Increase and decrease a counter" in {
      val counterId = "42"

      val updateResult =
        for {
          _ <- client.increase(IncreaseValue(counterId, 42))
          done <- client.decrease(DecreaseValue(counterId, 32))
        } yield done

      updateResult.futureValue

      val getResult = client.getCurrentCounter(GetCounter(counterId))
      getResult.futureValue.value shouldBe(42-32)
    }

  }

  override def afterAll(): Unit = {
    testKit.stop()
    super.afterAll()
  }
}

By default, the integration and unit test are both invoked by sbt test.

Note that since these tests require building and deploying a docker image as well as actual (local) networking, they depend on docker on the development machine. This also means they take a bit more time to execute compared to the unit tests which are completely local and do not involve networking.