Publishing and subscribing to topics on a broker

Akka Serverless integrates with Google Cloud Pub/Sub to enable asynchronous messaging. Consumers of published messages are guaranteed to receive them at least once. This means that receivers must be able to handle duplicate messages. If you are new to the world of Pub/Sub, many online resources provide tips on handling duplicate messages.

This page describes how to subscribe to published events, but you also need to configure Google Cloud Pub/Sub access for your Akka Serverless project as explained in How to configure a broker.

It is your responsibility to create the topics in Google Cloud Pub/Sub before configuring publishers or subscribers.

Subscribing to a topic’s messages

To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the (akkaserverless.method).eventing annotation. Specify the topic name in the in section of the annotation:

syntax = "proto3";

package shopping.cart.actions;

import "akkaserverless/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";

service ShoppingCartAnalyticsService {
  // get ItemAdded from the topic
  rpc ProcessAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = { (1)
      topic: "shopping-cart-events" (2)
    };
  }
}
1 annotate the Protobuf rpc method with (akkaserverless.method).eventing
2 use in and topic to subscribe to a topic

There is nothing specific required in the implementation of ProcessAdded. The implementation will in most cases be an Action and forward a converted message to a different component (eg. an Event Sourced Entity).

Receiving JSON messages

Your Akka Serverless service may subscribe to topics that use messages in JSON format. The messages must have the Content-Type attribute stating application/json.

The Protobuf rpc method receiving these JSON messages must be set up to receive Any.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.actions;

service ShoppingCartTopicService {
    rpc JsonFromTopic(google.protobuf.Any) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing.in = {
            topic:  "shopping-cart-json"
        };
    }
}

Publishing JSON messages to a topic

By default when publishig a message to a topic, the protobuf message is serialized to bytes and published with the content-type/ce-datacontenttype application/protobuf, and will also contain the metadata entry ce-type specifying the specific protobuf message type.

This is convenient when the consumer is another Akka Serverless service that can handle protobuf messages.

In many cases the consumer may be an external service though, and in such a case another serialization format for the messages can make sense. For such a use case the Akka Serverless Javascript SDK supports emitting JSON messages.

To publish JSON messages to a topic, mark the return type of the message as google.protobuf.Any. The object returned with replies.message() will be serialized to string format and published to the topic with content type Content-Type attribute stating application/json.

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "akkaserverless/annotations.proto";

message KeyValue {
  string key = 1;
  int32 value = 2;
}

service MyService {
  option (akkaserverless.codegen) = {
    action: {}
  };

  rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) {
    option (akkaserverless.method).eventing.in = {
      topic:  "notifications"
    };
  }

  rpc Produce(KeyValue) returns (google.protobuf.Any) { (1)
    option (akkaserverless.method).eventing.out = { (2)
      topic:  "notifications" (3)
    };
  }

}
1 return google.protobuf.Any from the method
2 annotate the Protobuf rpc method with (akkaserverless.method).eventing
3 use out and topic to publish to a topic

In the service implementation, return an arbitrary object, it will be serialized to JSON for the topic:

async Produce(request) {
  return replies.message({ (2)
    arbitrary: "json"
  })
}

Receiving CloudEvents

Akka Serverless uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Akka Serverless.

An example of that is the capability to send serialized Protobuf messages and have Akka Serverless deserialize them accordingly.

To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:

  • Content-Type = application/protobuf

  • ce-specversion = 1.0

  • ce-type = fully qualified protobuf message name (eg. shopping.cart.api.TopicOperation)

(The ce- prefixed attributes are part of the CloudEvents specification.)

The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.

syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
import "akkaserverless/annotations.proto";

package shopping.cart.api;

message TopicOperation {
    string operation = 1;
}

service ShoppingCartTopicService {

    rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) {
        option (akkaserverless.method).eventing.in = {
            topic:  "shopping-cart-protobuf-cloudevents"
        };
    }
}