Akka from Local to Cloud

Evolution from local application written in Java to actor based one using Akka Actor System. Few examples of how to evaluate simple application in easy way to be scalable and cloud ready. This is show up of almost complete course and workshop of how to build Actor System from scratch. 

 

Actor Model – long history short

Following Hewitt, Bishop, and Steiger’s 1973 publication, Irene Greif developed an operational semantics for the Actor Model as part of her doctoral research.

Akka was a part of Typesafe Platform formed – 2006-2010. Currently it is a part of Lightbend products.

„Akka” come from goddess in Sami mythology that represented all wisdom and beauty on the world.

 

https://akka.io/

 

Actor Model

Actor is a computational entity that, in response to a message it receives, can concurrently:

  • send a finite number of messages to other actors;
  • create a finite number of new actors;
  • designate the behavior to be used for the next message it receives.

 

There is no assumed sequence to the above actions and they could be carried out in parallel.

 

 

Actor is an entity encapsulating:

  • State
  • Mailbox
  • Behavior

 

Advantages of using Actor Model

  • Concurrent application logic
  • Async logical operation chain execution
  • Schedule once or many times
  • Control role based behavior
  • Scal-in & Scal-out
  • Distribution of work

 

Like in real life – Events, Domains and Actors

Actor Model can be compared to very real life. Messages are represented by information’s sent by Teacher to Student. However this Student can generate new information’s and make some work. One teacher can control many students. Developing an application based on real life examples is very close to business. That may be a way to achieve Domain Driven Design and Event Driven System.

After this article, see the one that describes Clean Architecture in Software – Simplified Concept and you should understand how easy and quick can be building really enterprise ready applications.

 

Example of Local Application

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;

import org.glassfish.grizzly.http.server.HttpServer;
import org.glassfish.grizzly.http.server.NetworkListener;
import org.glassfish.grizzly.threadpool.ThreadPoolConfig;
import org.glassfish.jersey.server.ResourceConfig;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

import utils.EnglishNumberToWords;
import utils.GrizzlyHTTPServer;

public class LocalExample {

	private Map<Long, String> mapWithData = new HashMap<>();

	public void start() throws Exception {

		System.out.println("Will inititialize web server...");

		final HttpServer httpServer = GrizzlyHTTPServer.prepareWebServer(
				new ResourceConfig().register(new JacksonJsonProvider()).register(new ExampleApi()), "0.0.0.0", 8888);
		ThreadPoolConfig threadPoolConfig = ThreadPoolConfig.defaultConfig().
				setPoolName("api-worker-pool").setCorePoolSize(50).setMaxPoolSize(100);
		NetworkListener listener = httpServer.getListeners().iterator().next();
		listener.getTransport().setWorkerThreadPoolConfig(threadPoolConfig);

		httpServer.start();

		while (true) {
			Thread.sleep(10);
		}

	}

	@Path("/")
	public class ExampleApi {

		public static final String APPLICATION_JSON = "application/json";

		@GET
		@Consumes({ APPLICATION_JSON })
		@Produces({ APPLICATION_JSON })
		@Path("/")
		public void element(@Context final ContainerRequestContext requestContext, @Suspended final AsyncResponse asyncResponse) {

			String transaction = UUID.randomUUID().toString();
			System.out.println("Rest request for GET /element with uuid=" + transaction);
			Long id = ThreadLocalRandom.current().nextLong(100000);
			mapWithData.put(id, EnglishNumberToWords.convert(id));
			String answer = mapWithData.get(id);
			// Server.doLongOperation();
			asyncResponse.resume(answer);

		}

	}

	public static void main(final String[] args) throws Exception {
		LocalExample localExample = new LocalExample();
		localExample.start();
	}
}
One Class Backend Application

In this example there is most easy application that can be developed. You can see that HTTP server is started at 8888 and is serving one endpoint that will generate random number and return it in answer. Moreover that endpoint will convert this number to the word and additionally write it to the in-memory map that has representation of number to word.

 

After start of this application and making request to

http://localhost:8888/

user will see a return representing words like: “one”, “two”, “three” and so on.

To use this as real example just build your project with gradle and run it.

// first gradle 
group 'org.example.akka.workshop'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    mavenCentral()
    mavenLocal()
}

task wrapper(type: Wrapper) {
    gradleVersion = '2.4'
}

dependencies {

    compile group: 'com.google.guava', name: 'guava', version: '19.0'

	// Grizzly HTTP 2 server dependencies
    compile group: 'org.glassfish.jersey.containers', name: 'jersey-container-grizzly2-http', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-moxy', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-moxy', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-jaxb', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-json-jackson', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-multipart', version: '2.18'
    compile group: 'org.glassfish.jersey.media', name: 'jersey-media-json-processing', version: '2.18'
    compile group: 'org.glassfish.jersey.ext.rx', name: 'jersey-rx-client-rxjava', version: '2.18'
    compile group: 'org.glassfish.jersey.bundles', name: 'jaxrs-ri', version: '2.18'

	// JSON - XML for Grizzly Server
    compile 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:2.6.3'
    compile 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-xml-provider:2.6.3'
    compile 'com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:2.6.3'
    compile 'com.fasterxml.jackson.core:jackson-core:2.5.3'
    compile 'com.fasterxml.jackson.core:jackson-databind:2.5.3'
    compile group: 'com.sun.xml.bind', name: 'jaxb-impl', version: '2.2.4-1'

}
build.gradle

 

Example local based application will be non async. The only async here that is presented is an async response in grizzly http server that is transparent to developer. Your first application is in one class. You can build it using simple MVC pattern in few classes if you wish.
If so, then you will have some gradle wrapped project, grizzly nio http server with async enabled and controller with local static map.

 

When ready with this application then can run performance check of it. For this can be used Engulf.

Example results from mine tests

 

I was able to achieve 2044 requests per second with average response time of 355 ms.

 

First Actor Model Application using Akka

Concept of roles in actor based application

Lets build the same using Akka and Actor Model. It should be very simple. At the beginning we have to think a little bit and design our system and actors.

 

In this concept we will use Frontend Actor, Backend Actor and Database Actor.

Let’s split responsibilities and divide them onto actors. Frontend Actor will be responsible for reendering message back to user and accept request from user. Backend Actor will calculate all the stuff. Database Actor will try to write or keep information’s (like database writes / reads). It’s not yet real Event Driven Design or CQRS, but take a look that it may in next steps forward to it. We just are going to be ready.

Let’s start to build akka ready application

To do that you need to:

  • add dependencies
  • prepare first configuration
  • start in code an actor system
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-remote_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-cluster_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-cluster-tools_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-slf4j_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-cluster-sharding_2.11', version: akkaVersion
    compile group: 'com.typesafe.akka', name: 'akka-distributed-data-experimental_2.11', version: akkaVersion
build.gradle

I have added in this step all needed dependencies to not bother us later with that.

 

Simplest config to use in local.

akka {

  actor {
    provider = "akka.actor.LocalActorRefProvider"
  }

}
application.conf

 

Also from important things, we have to remember to start Actor System.

//Start local akka system
		ActorSystem cluster_server = ActorSystem.create("example-system", ConfigFactory.load("example_local.conf"));

 

Start of remote actors that we have defined. Please take a look how Main in application may look like now.

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;

import com.typesafe.config.ConfigFactory;

public class AkkaLocalExample {

	public void start() {

		//Start local akka system
		ActorSystem cluster_server = ActorSystem.create("example-system", ConfigFactory.load("example_local.conf"));

		//Create and start actor for logging purposes
		cluster_server.actorOf(Props.create(LoggingActor.class), "logger");

		cluster_server.actorSelection("/user/logger")
				.tell("Logging actor system should be initialized now - it's our first message", ActorRef.noSender());

		// Create http server
		ActorRef ref = cluster_server.actorOf(Props.create(RestServerActor.class));
		ref.tell(new InitMessage(), ActorRef.noSender());

		// Create transaction actor for frontend
		// NONE - will use creation of actor in rest layer

		// Create actor for backend transactions
		cluster_server.actorOf(new RoundRobinPool(100).props(Props.create(BackendExecutionActor.class)), "backend");

		// Create some data structure
		cluster_server.actorOf(Props.create(LocalHashMapDataActor.class), "data");

	}

	public static void main(final String[] args) {
		AkkaLocalExample localExample = new AkkaLocalExample();
		localExample.start();
	}
}
Main.java

In following class there has been started actors and defined their main properties. Look very carefully to the above code. You should observe, that not only those described at the beginning actors are there. Also a router would be an actor. In example “BackendExecutionActor” is not named “backend”. The router with round robin poll that controls “BackendExecutionActor” is named “backend.

There in Main has been sent first messages to the actors.

Construct actors in local based system

 

To process messages there have to be present actors. Thiers construction may be very different depending on the needs.

import akka.actor.UntypedActor;

public class LoggingActor extends UntypedActor {

	public LoggingActor() {
		System.out.println("CONSTRUCTOR: Creating local logging actor");
	}

	@Override public void onReceive(Object message) throws Throwable {
		System.out.println("Actor=" + sender().path() + " Message=" + message);
	}

}
LoggingActor

 

import akka.actor.AbstractActorWithStash;
import akka.japi.pf.ReceiveBuilder;

import utils.EnglishNumberToWords;

public class LocalHashMapDataActor extends AbstractActorWithStash {

	private Map<Long, String> mapWithData = new HashMap<>();

	public LocalHashMapDataActor() {
		System.out.println("CONSTRUCTOR: Actor with Data Map kept in local memory");
		context().actorSelection("/user/logger").tell("CONSTRUCTOR: Actor with Data Map kept in local memory", self());

		receive(ReceiveBuilder
				.match(GetElementByIdRequest.class, this::getElement)
				.matchAny(m -> System.out.println("Received unknown Message of type: " + m.getClass()))
				.build());
	}

	private void getElement(GetElementByIdRequest message) {
		System.out.println("Data action for GET /element uuid=" + message.getTransactionId());
		context().actorSelection("/user/logger").tell("Data action for GET /element uuid=" + message.getTransactionId(), self());

		mapWithData.put(message.getId(), EnglishNumberToWords.convert(message.getId()));

		sender().tell(new AnswerMessage<>(mapWithData.get(message.getId()), message.getTransactionId()), self());
	}

}
LocalHashMapDataActor

 

Some of actors need to have parameters during construction. The way of constructing them is a little bit different.

		ActorRef request = actorSystem.actorOf(Props.create(FrontendRequestActor.class, transaction, System.currentTimeMillis(), asyncResponse));
FrontendActor that is per request

 

Such an Actor class would accept constructor parameters and use them till end of its lifecycle.

import akka.actor.UntypedActor;

public class FrontendRequestActor extends UntypedActor {

	private final String transactionId;
	private final long timeOfStart;
	private AsyncResponse restRequest;

	public FrontendRequestActor(String transactionId, long timeOfStart, AsyncResponse asyncResponse) {
		System.out.println("CONSTRUCTOR: Some frontend actor for request: " + transactionId);
		context().actorSelection("/user/logger").tell("CONSTRUCTOR: Some frontend actor for request: " + transactionId, self());
		this.transactionId = transactionId;
		this.timeOfStart = timeOfStart;
		this.restRequest = asyncResponse;
	}

	@Override public void onReceive(Object message) throws Throwable {
		if (message instanceof GetElementByIdRequest) {
			processRequestFromRestToBackendForGetElement((GetElementByIdRequest) message);
		} else if (message instanceof AnswerMessage) {
			processAnswerFromBackendToRest((AnswerMessage) message);
		} else {
			System.out.println("Unknown message: " + message);
		}
	}

	private void processRequestFromRestToBackendForGetElement(GetElementByIdRequest restRequest) {
		System.out.println("Processing rest request for GET /element in akka actor for uuid=" + transactionId);
		context().actorSelection("/user/logger").tell("Processing rest request for GET /element in akka actor for uuid=" + transactionId, self());

		context().actorSelection("/user/backend").tell(restRequest, self());
	}

	private void processAnswerFromBackendToRest(AnswerMessage message) {
		long timeOfExecution = System.currentTimeMillis() - timeOfStart;
		System.out.println("Processing rest answer for uuid=" + transactionId + " with time=" + timeOfExecution);
		context().actorSelection("/user/logger")
				.tell("Processing rest answer for uuid=" + transactionId + " with time=" + timeOfExecution, self());

		restRequest.resume(message.getAnswer());
	}

}
FrontendRequestActor

 

There was present also other type of actors that are not dependent with business logic itself. Those would be threat as infrastructure part.

import akka.actor.AbstractActorWithStash;
import akka.japi.pf.ReceiveBuilder;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

import utils.GrizzlyHTTPServer;

public class RestServerActor extends AbstractActorWithStash {

	public RestServerActor() {
		System.out.println("CONSTRUCT - Creating rest server actor");
		context().actorSelection("/user/logger").tell("CONSTRUCT - Createing rest server actor", self());

		receive(ReceiveBuilder
				.match(InitMessage.class, this::initWebServer)
				.matchAny(m -> System.out.println("Received unknown Message of type: " + m.getClass()))
				.build());
	}

	private void initWebServer(InitMessage message) throws IOException {
		System.out.println("Will inititialize web server...");
		context().actorSelection("/user/logger").tell("Will inititialize web server...", self());

		final HttpServer httpServer = GrizzlyHTTPServer.prepareWebServer(
				new ResourceConfig().register(new JacksonJsonProvider()).register(new ExampleApi(context().system())), "0.0.0.0", 8888);
		ThreadPoolConfig threadPoolConfig = ThreadPoolConfig.defaultConfig().
				setPoolName("api-worker-pool").setCorePoolSize(50).setMaxPoolSize(100);
		NetworkListener listener = httpServer.getListeners().iterator().next();
		listener.getTransport().setWorkerThreadPoolConfig(threadPoolConfig);

		httpServer.start();

	}

}
Actor that start Rest Services

 

The rest server will serve his services in one endpoint. During each request to that endpoint there would be created instance of Actor that is responsible for accept request and present answer. There is possibility that taking care of such construction may be delegated to the dispatcher.

 

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;

@Path("/")
public class ExampleApi {

	public static final String APPLICATION_JSON = "application/json";

	private ActorSystem actorSystem;

	public ExampleApi(ActorSystem system) {
		actorSystem = system;
	}

	@GET
	@Consumes({ APPLICATION_JSON })
	@Produces({ APPLICATION_JSON })
	@Path("/")
	public void element(@Context final ContainerRequestContext requestContext, @Suspended final AsyncResponse asyncResponse) {

		String transaction = UUID.randomUUID().toString();
		System.out.println("Rest request for GET /element with uuid=" + transaction);
		Long id = ThreadLocalRandom.current().nextLong(10);
		ActorRef request = actorSystem.actorOf(Props.create(FrontendRequestActor.class, transaction, System.currentTimeMillis(), asyncResponse));
		request.tell(new GetElementByIdRequest(transaction, id), ActorRef.noSender());

	}

}
API

 

Make use of Patterns

There is sometimes need to have blocking code in application. It depends on place where async is meeting with sync. To perform such a connection there are introduced Patterns.

public class BackendExecutionActor extends AbstractActorWithStash {

	public BackendExecutionActor() {
		System.out.println("CONSTRUCT - Creating backend actor");
		context().actorSelection("/user/logger").tell("CONSTRUCT - Creating backend actor", self());

		receive(ReceiveBuilder
				.match(GetElementByIdRequest.class, this::getElementById)
				.matchAny(m -> System.out.println("Received unknown Message of type: " + m.getClass()))
				.build());
	}

	private void getElementById(GetElementByIdRequest message) {

		System.out.println("Backend action for GET /element uuid=" + message.getTransactionId());
		context().actorSelection("/user/logger").tell("Backend action for GET /element uuid=" + message.getTransactionId(), self());

		ActorRef requestSender = sender();
		ActorSelection actorForData = context().system().actorSelection("/user/data");

		Patterns.pipe(Patterns.ask(actorForData, message, 30000), context().system().dispatcher())
				.to(context().system().actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
					receive(ReceiveBuilder.match(AnswerMessage.class, answer -> {
								//Server.doLongOperation();
								sendAnswerBack(answer, requestSender);
							}).build()
					);
				}})));

	}

	private void sendAnswerBack(AnswerMessage answer, ActorRef restActor) {
		System.out.println("Backend will answer for uuid=" + answer.getTransactionId());
		context().actorSelection("/user/logger").tell("Backend will answer for GET /elements uuid=" + answer.getTransactionId(), self());
		restActor.tell(answer, self());
	}
}

 

Messages are like Commands and Queries

import java.io.Serializable;

public class InitMessage implements Serializable {
}
import java.io.Serializable;

public class GetElementByIdRequest implements Serializable {

	private final String transactionId;

	private final Long id;

	public GetElementByIdRequest(String transactionId, Long id) {
		this.transactionId = transactionId;
		this.id = id;
	}

	public String getTransactionId() {
		return transactionId;
	}

	public Long getId() {
		return id;
	}
}
public class AnswerMessage<DATA> implements Serializable {

	private final DATA answer;

	private final String transactionId;

	public AnswerMessage(DATA answer, String transactionId) {
		this.answer = answer;
		this.transactionId = transactionId;
	}

	public DATA getAnswer() {
		return answer;
	}

	public String getTransactionId() {
		return transactionId;
	}
}

 

Answer for a message is done in Actor, just take a look above at Backend Execution Actor.

 

Work of Actor Model in example

In above first example of Local Actor System has been shown many ways to:

  • Construct Actors
  • Routing
  • Naming of Actors in a System
  • Blocking Patterns
  • Way of addressing and sending messages.

After run of Main class from this example, there will be fully working example that uses the same business logic as the very first simple application. It will return in endpoint word that represents a random number.

Checking of performance will show that even if there are many actor created and that simple logs are sent using actor messaging system, it is still very good. It is a way to go fully async in a future.

 

During short period of checking performance decreased a little bit. We have more inside, it’s natural. However long running will show up that overall performance will increase.

 

Going into Cluster

Akka Actor System offers a way to easy scale our application. It’s the most important advantage of using this way of programming. Till now there was a running example of akka based backend that is running only locally. However in real situations probably you would need to divide your applications deployments into different servers depend on a role of component.

There are two most important ways to achieve network functionality. First one is Remote and second one is Cluster. Using Remote, there has to be addressing done by hostnames or IP addresses.  Using Cluster, Akka offers to take care about infrastructural parts of addressing and offers named based routing. As in most cases there is no need to use Remote, so in example will be used Cluster one.

 

Divide application into modules and start cluster

Start should be done by dividing existing local application into logical modules. Based on already existing akka backend that is running only in local, we will divide it onto modules:

  • Akka Cluster Server
  • Logger
  • Frontend (Rest)
  • Backend
  • Data

Starting of a cluster should be very easy in code.

public void start() {

		AkkaClusterExampleRoleServer server = new AkkaClusterExampleRoleServer();
		server.start();

		AkkaClusterExampleRoleLogging logger = new AkkaClusterExampleRoleLogging();
		logger.start();

		AkkaClusterExampleRoleFrontend frontend = new AkkaClusterExampleRoleFrontend();
		frontend.start();

		AkkaClusterExampleRoleBackend backend = new AkkaClusterExampleRoleBackend();
		backend.start();

		AkkaClusterExampleRoleDataLocal data = new AkkaClusterExampleRoleDataLocal();
		data.start();

	}
How to start very first cluster

 

Akka is using Hockon as a configuration. By default there should be present some config file. We have to assure that our cluster is properly configured.

akka {

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = on

    netty.tcp {
      hostname = "192.168.10.111"
      port = 2550
    }

  }

  cluster {

    roles = ["serwer"]

    seed-nodes = ["akka.tcp://example-system@192.168.10.111:2550"]

    auto-down-unreachable-after = 120s

    receptionist {
      # Actor name of the ClusterReceptionist actor, /user/receptionist
      name = receptionist

      # Start the receptionist on members tagged with this role.
      # All members are used if undefined or empty.
      role = "serwer"

      # The receptionist will send this number of contact points to the client
      number-of-contacts = 1

    }

  }

  extensions = ["akka.cluster.client.ClusterClientReceptionist", "akka.cluster.pubsub.DistributedPubSub"]

}
Config for a Server Role

 

In Akka there is need to define where is located a server as it offers server – client way of communication. Take care about addressing and main routing gets Receptionist. The Receptionist is an Actor that is offered by Akka itself.

Now it’s part to offer configuration also for logical modules of our application.

akka {

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off

    netty.tcp {
      hostname = "127.0.0.1"
      port = 2554
    }

  }

  cluster {

    roles = ["logging"]

    seed-nodes = ["akka.tcp://example-system@127.0.0.1:2550"]

  }

  extensions = ["akka.cluster.client.ClusterClientReceptionist", "akka.cluster.pubsub.DistributedPubSub"]

}
Logging Module Config
akka {

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off

    netty.tcp {
      hostname = "192.168.10.111"
      port = 2551
    }

  }

  cluster {

    roles = ["frontend"]

    seed-nodes = ["akka.tcp://example-system@192.168.10.111:2550"]

  }

  extensions = ["akka.cluster.client.ClusterClientReceptionist", "akka.cluster.pubsub.DistributedPubSub"]
}
Frontend Module Config
akka {

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off

    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }

  }

  cluster {

    roles = ["backend"]

    seed-nodes = ["akka.tcp://example-system@127.0.0.1:2550"]

  }

  extensions = ["akka.cluster.client.ClusterClientReceptionist", "akka.cluster.pubsub.DistributedPubSub"]
}
Backend Module Config
akka {

  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }

  remote {
    log-remote-lifecycle-events = off

    netty.tcp {
      hostname = "192.168.10.111"
      port = 2553
    }

  }

  cluster {

    roles = ["data"]

    seed-nodes = ["akka.tcp://example-system@192.168.10.111:2550"]

  }

  extensions = ["akka.cluster.client.ClusterClientReceptionist", "akka.cluster.pubsub.DistributedPubSub"]
}
Data Module Config

 

There have been used Roles to properly divide application onto the modules. In such a way we may start application components based on configuration defined Roles.

Starting of actor system based on properties may be done by giving name of configuration file.

public void start() {

		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_cluster_server.conf"));

	}
How to start actor system

 

Register an Actor in Cluster and Route local messages

This part is a most difficult one to understand for persons that are not familiar with server – client architecture pattern. In a short words we can tell that already there was some readable names assigned to actors in previous example and now we have to register them in two places:

  • cluster (in receptionist)
  • clients
Registration in a cluster

Already used names may be used now to guarantee smooth switch from old local akka code to new cluster based code.

public class AkkaClusterExampleRoleLogging {

	public void start() {

		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_cluster_logging.conf"));

		//Create and start actor for logging purposes
		ActorRef loggingActor = akka.actorOf(Props.create(LoggingActor.class), "logger");

		// Make visible this actor to the cluster (/system/recepcionist)
		ClusterClientReceptionist.get(akka).registerService(loggingActor);

		// It will tell to local actor but as we don't have wrapper then it will not use remote actor in here.
		akka.actorSelection("/user/logger")
				.tell("Logging actor system should be initialized now - it's our first message", ActorRef.noSender());

	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	public static void main(final String[] args) {
		AkkaClusterExampleRoleLogging localExample = new AkkaClusterExampleRoleLogging();
		localExample.start();
	}

}
Logging Role Initialize

In this code there have been created by a Logging Module (so running at this node) an Actor with a name “logger”. With the same name it has been registered in cluster. In such a way a Receptionist that is on module “server” will know where to forward in a cluster messages to the “logger”.

Registering a routing from local name to cluster

As already used code is using a name of actor (like “logger”) to send messages to him, so then there is need in every client that is using such a name to register where it has to be forwarded. For this reason in every client there has to be registered in local an abstract actor with a name “logger” that will get messages in existing client system and forward them to the cluster. A wrapper pattern has been used to do that.

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Frontend akka role is started", ActorRef.noSender());
How any client may send to logger.

 

Starting all modules

public class AkkaClusterExampleRoleFrontend {

	public void start() {
		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_cluster_frontend.conf"));

		// Start a frontend http rest server
		// TODO: (TASK 2) - Make Rest Layer be accessible (by member up/down or some registration command) thru the loadbalancer actor by http proxy, so that mine requests go to one computer and then it is balancing it to other rest servers
		ActorRef ref = akka.actorOf(Props.create(RestServerActor.class));
		ref.tell(new InitMessage(), ActorRef.noSender());

		// Start of actor router to the cluster
		ActorRef clientToCluster = akka.actorOf(ClusterClient.props(ClusterClientSettings.create(akka).withInitialContacts(initialContacts())));

		// Start a router and wrapper to backend. Bcs the frontend actors will have to reach backend actors.
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.Send("/user/backend", messageToForward), sender())
			).build());
		}}), "backend");

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Frontend akka role is started", ActorRef.noSender());

	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	public static void main(final String[] args) {
		AkkaClusterExampleRoleFrontend frontend = new AkkaClusterExampleRoleFrontend();
		frontend.start();
	}

}
Frontend
public class AkkaClusterExampleRoleDataLocal {

	public void start() {
		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_cluster_data.conf"));

		// Start a local hash map with data. Be aware - it's not yet a cluster singleton! But it can simply be chnaged to cluster singleton!
		// TODO: (TASK 1) - Make this actor available as cluster singleton. You have half an hour.
		ActorRef dataActor = akka.actorOf(Props.create(LocalHashMapDataActor.class), "data");

		// Make visible this actor to the cluster (/system/recepcionist)
		ClusterClientReceptionist.get(akka).registerService(dataActor);

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		ActorRef clientToCluster = akka
				.actorOf(ClusterClient.props(ClusterClientSettings.create(akka).withInitialContacts(initialContacts())));

		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Data akka role is started", ActorRef.noSender());

	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	public static void main(final String[] args) {
		AkkaClusterExampleRoleDataLocal data = new AkkaClusterExampleRoleDataLocal();
		data.start();
	}

}

 

public class AkkaClusterExampleRoleBackend {

	public void start() {
		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_cluster_backend.conf"));

		// Start a backend cluster
		ActorRef routerToBackendActors = akka.actorOf(new SmallestMailboxPool(100).withResizer(new DefaultResizer(1, 100))
				.props(Props.create(BackendExecutionActor.class)), "backend");

		ClusterClientReceptionist.get(akka).registerService(routerToBackendActors);

		// Start of actor router to the cluster
		ActorRef clientToCluster = akka.actorOf(ClusterClient.props(ClusterClientSettings.create(akka).withInitialContacts(initialContacts())));

		// Start a router and wrapper to data actor. Bcs the backend actors will have to reach data.
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.Send("/user/data", messageToForward, false), sender())
			).build());
		}}), "data");

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Backend akka role is started", ActorRef.noSender());
	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	public static void main(final String[] args) {
		AkkaClusterExampleRoleBackend backend = new AkkaClusterExampleRoleBackend();
		backend.start();
	}

}

 

 

How to send messages in cluster

Local Message
akka.actorSelection("/user/logger").tell("Backend akka role is started", ActorRef.noSender());

 

Message to One in Cluster
messageToForward -> clientToCluster.tell(new ClusterClient.Send("/user/data", messageToForward, false), sender())

 

Message to all in cluster
messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())

 

Scalable group of actors in cluster using routing

localAkkaSystem.actorOf(new RoundRobinPool(100).props(Props.create(BackendExecutionActor.class)), "backend");
ActorRef routerToBackendActors = akka.actorOf(new SmallestMailboxPool(100).withResizer(new DefaultResizer(1, 1000))    .props(Props.create(BackendExecutionActor.class)), "backend");

There are different ways of creating routers and their policies. Take a look that we are sending messages to the router and it’s automaticly forwarding them. Actor instance of Backend Execution Actor is not named “backend” there has been registered a router with that name and such a router will manager of forwarding a message to defined number of Backend Execution Actors instances.

Run an example and check performance

 

While run the above example on one computer, there can be observed that time of response is lower, however overall performance has decreased. However now scaling is near the linear. Dependent on configurations and number of actor instances, in a quick way you will reach better performance than Local Code Example and will be able to scale almost in linear way. Try it out!

 

Cluster Example as a Workshop

If you have a second computer available or few ones. If you have a friend available with computer or few friends… then just use existing already Cluster Example. By running this backend on few computers with different roles, you will be able to check performance and see how magic happens and messages are sent between computers in cluster. Just remember to adjust configuration.

public class Server {
   public static final String ADDRESS = "akka.tcp://example-system@127.0.0.1:2550/system/receptionist";

}
seed-nodes = ["akka.tcp://example-system@127.0.0.1:2550"]
netty.tcp {
  hostname = "127.0.0.1"
  port = 2554
}

 

Sharding of Data

While now for keep a managing of data we used Data Actor that had infrastructural responsibility to read and write into repository.

Why to not use Actor Model to define the ACTOR as a Data (or Person)?

Let’s imagine that our Word (Number) is also an Actor. In real example it may refer to something like client or device. For sure it may be related with Data pattern called “Aggregates”.

 

Akka offers Sharding for such a purpose. More about this topic is a part of other posts.

Proxy and Shard with a Guardian and Router
public class ShardingExample {

	public void start() {

		AkkaClusterExampleRoleServer server = new AkkaClusterExampleRoleServer();
		server.start();

		AkkaClusterExampleRoleLogging logger = new AkkaClusterExampleRoleLogging();
		logger.start();

		AkkaClusterExampleRoleFrontend frontend = new AkkaClusterExampleRoleFrontend();
		frontend.start();

		AkkaClusterExampleRoleBackend backend = new AkkaClusterExampleRoleBackend();
		backend.start();

		AkkaShardingExampleRoleDataProxy dataProxy = new AkkaShardingExampleRoleDataProxy();
		dataProxy.start();

		AkkaShardingExampleRoleDataNode dataNode = new AkkaShardingExampleRoleDataNode();
		dataNode.start();

	}

	public static void main(final String[] args) {
		ShardingExample shardingExample = new ShardingExample();
		shardingExample.start();
	}
}
public class AkkaShardingExampleRoleDataProxy {

	public void start() {
		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_sharding_data_proxy.conf"));

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		ActorRef clientToCluster = akka
				.actorOf(ClusterClient.props(ClusterClientSettings.create(akka).withInitialContacts(initialContacts())));

		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Initializing the client actor to the sharding with message extractor
		ActorRef shardingClient = ClusterSharding.lookup().get(akka).startProxy("DataAkkaActor", Optional.of("data_sharding"), messageExtractor);

		// The wrapper that is telling - "I'm the data" and will forward message to the sharding
		ActorRef routerToSharding = akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> tellToSharding(messageToForward, sender(), shardingClient)
			).build());

		}}), "data");

		ClusterClientReceptionist.get(akka).registerService(routerToSharding);

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Data Proxy akka role is started", ActorRef.noSender());

	}

	public void tellToSharding(Object messageToForward, ActorRef sender, ActorRef shardingClient) {
		System.out.println("Will forward message " + messageToForward);
		shardingClient.tell(messageToForward, sender);
	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	ShardRegion.MessageExtractor messageExtractor = new ShardRegion.HashCodeMessageExtractor(30) {

		@Override
		public String entityId(Object message) {
			if (message instanceof GetElementByIdRequest) {
				return String.valueOf(((GetElementByIdRequest) message).getId());
			} else {
				return message.getClass().getSimpleName();
			}
		}

		@Override
		public Object entityMessage(Object message) {
			return message;
		}

		@Override
		public String shardId(Object messageObject) {
			return super.shardId(messageObject);
		}

	};

	public static void main(final String[] args) {
		AkkaShardingExampleRoleDataProxy data = new AkkaShardingExampleRoleDataProxy();
		data.start();
	}

}
public class AkkaShardingExampleRoleDataNode {

	public void start() {
		//Start local akka system
		ActorSystem akka = ActorSystem.create("example-system", ConfigFactory.load("example_sharding_data.conf"));

		ActorRef clientToCluster = akka
				.actorOf(ClusterClient.props(ClusterClientSettings.create(akka).withInitialContacts(initialContacts())));

		// Logging part - initialize router for cluster client and wrapper for message that will send to all cluster /user/logger
		akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> clientToCluster.tell(new ClusterClient.SendToAll("/user/logger", messageToForward), sender())
			).build());
		}}), "logger");

		// Tell to logger cluster as usually
		akka.actorSelection("/user/logger").tell("Data Sharding akka role is started", ActorRef.noSender());

		// Sharding server start
		ClusterShardingSettings settings = ClusterShardingSettings.create(akka);
		ClusterSharding.lookup().get(akka).start("DataAkkaActor", Props.create(DataActor.class), settings, messageExtractor);

	}

	private static Set<ActorPath> initialContacts() {
		return Sets.newHashSet(ActorPaths.fromString(Server.ADDRESS));
	}

	public static void main(final String[] args) {
		AkkaShardingExampleRoleDataNode data = new AkkaShardingExampleRoleDataNode();
		data.start();
	}

	ShardRegion.MessageExtractor messageExtractor = new ShardRegion.HashCodeMessageExtractor(30) {

		@Override
		public String entityId(Object message) {
			if (message instanceof GetElementByIdRequest) {
				return String.valueOf(((GetElementByIdRequest) message).getId());
			} else {
				return message.getClass().getSimpleName();
			}
		}

		@Override
		public Object entityMessage(Object message) {
			return message;
		}

		@Override
		public String shardId(Object messageObject) {
			return super.shardId(messageObject);
		}

	};

}

 

Take a look that in shard it’s easy to manage your data. The actor that is in shard may also send messages to other infrastructural actor that will write snapshot to any database or even use Event Based Architecture.

 

Modularity in a Sharding as na option in cluster
// Sharding server start
		ClusterShardingSettings settings = ClusterShardingSettings.create(akka);
		ClusterSharding.lookup().get(akka).start("DataAkkaActor", Props.create(DataActor.class), settings, messageExtractor);

// Initializing the client actor to the sharding with message extractor
		ActorRef shardingClient = ClusterSharding.lookup().get(akka).startProxy("DataAkkaActor", Optional.of("data_sharding"), messageExtractor);

		// The wrapper that is telling - "I'm the data" and will forward message to the sharding
		ActorRef routerToSharding = akka.actorOf(Props.create(AbstractActor.class, () -> new AbstractActor() {{
			receive(ReceiveBuilder.matchAny(
					messageToForward -> tellToSharding(messageToForward, sender(), shardingClient)
			).build());

		}}), "data");

		ClusterClientReceptionist.get(akka).registerService(routerToSharding);
ClusterClientReceptionist.get(akka).registerService(routerToSharding);

 

Location of Actor with Data in a Shard

Message Id Extraction method may be used to decide of where and how to find actor in a shard that keeps data.

ShardRegion.MessageExtractor messageExtractor = new ShardRegion.HashCodeMessageExtractor(30) {

		@Override
		public String entityId(Object message) {
			if (message instanceof GetElementByIdRequest) {
				return String.valueOf(((GetElementByIdRequest) message).getId());
			} else {
				return message.getClass().getSimpleName();
			}
		}

		@Override
		public Object entityMessage(Object message) {
			return message;
		}

		@Override
		public String shardId(Object messageObject) {
			return super.shardId(messageObject);
		}

	};

 

Data and it’s distribution

public class DataActor extends AbstractActorWithStash {

	private String textToNumber;

	public DataActor() {
		System.out.println("CONSTRUCTOR: Actor with Data in sharding");
		context().actorSelection("/user/logger").tell("CONSTRUCTOR: Actor with Data in sharding", self());

		receive(ReceiveBuilder
				.match(GetElementByIdRequest.class, this::getElement)
				.matchAny(m -> System.out.println("Received unknown Message of type: " + m.getClass()))
				.build());
	}

	private void getElement(GetElementByIdRequest message) {
		System.out.println("Data in sharding action for GET /element uuid=" + message.getTransactionId());
		context().actorSelection("/user/logger").tell("Data action for GET /element uuid=" + message.getTransactionId(), self());

		if (textToNumber == null) {
			textToNumber = EnglishNumberToWords.convert(message.getId());
			System.out.println("I will create new: " + textToNumber);

		} else {
			System.out.println("I will reuse: " + textToNumber);
		}

		sender().tell(new AnswerMessage<>(textToNumber, message.getTransactionId()), self());
	}

}

 

Performance

 

Conclusion

Akka may be a very good way to achieve different patterns including:

  • CQRS
  • Event Sourcing
  • Domain Driven Design
  • Clean Architecture in Software

If you look for a method that this part of examples has been written you could observe that transforming was very easy, because from the beginning till the end the code and it’s structure was modular and ready for transformation.

In big systems where there are hundreds of thousands clients, it’s only way to have good performance and avoid bottlenecks.

 

Posted in: Sortware

Tagged as: , ,

Advertisements
Akka from Local to Cloud

Will you use Akka in future project?

Thank you for the vote!

Leave a Reply

Be the First to Comment!

Leave a Reply

  Subscribe  
Notify of