Basic Sample¶
Note
- The full source code is on GitHub
- The cluster and cluster client use Duty Cycle Idle Strategies with a sleep of 1 millisecond which impacts performance.
To create a basic clustered Aeron service, we will do the following:
- Define a clustered service. This is the service holding the business logic within the cluster, and it will run in a Clustered Service Container
- Build up the configuration for a single node cluster
- Configure a client to communicate with the cluster
- Send a message to the cluster, and poll for responses
Converting the Replicated State Machine¶
The sample state machine within the Replicated State Machine can be easily adapted to run within an Aeron Clustered Container. The commands to the state machine are no longer messages, but rather raw long and integer values that are moved over the wire via Simple Binary Encoding flyweights. The SBE message definitions are on GitHub.
In earlier editions of this page, the RSM cluster logic and some messaging logic were intertwined, which has proven to be somewhat painful with larger replicated state machines.
The updated cluster hosted replicated state machine logic is now simply:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
|
To join the Aeron Cluster infrastructure with the replicated state machine above, we need to adapt messages as all that Aeron provides is single call to receive any client messages within the ClusteredService
:
1 2 3 4 5 6 7 8 9 10 11 12 |
|
Note the setSession(session)
call above. This captures the client session that sent in the message in the adapter.
We will use this later to return the current value.
Message adaption from SBE to the source code is performed by the RsmAdapter
, as shown in this snippet below.
Note that the messages are encoded with Simple Binary Encoding, which has generated the Encoders
and Decoders
for us.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
Along with commands to manipulate the replicated state machine, we also need the ability to reload the cluster state with the correct value using snapshots.
The ClusteredService
has a method onTakeSnapshot
, which provides snapshot support. Using the takeSnapshot
above, an implementation using Simple Binary Encoding for writing the snapshot is:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
|
To read the snapshot, the onStart
method will need to be provided with logic to read the snapshot.
We are going to reuse the adapter for this:
1 2 3 4 5 6 7 8 |
|
Now that we have a state machine that accepts commands, and can read and write snapshots in the cluster, we need to return the current value back to clients.
For this sample, we're going to include this in the RsmAdapter
, but in more elaborate scenarios it is typically a dedicated object.
Earlier, in the ClusteredService
, we captured the client Session
in a session
variable.
We need to use that Session
object in order to reply directly to the cluster client, as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
|
Building the client¶
For this basic sample, the client will be sending in a mixture of AddCommand
and MultiplyCommand
commands, and receiving the emitted CurrentValueEvent
.
To receive messages, the code must implement EgressListener
, and pass that to the Aeron Cluster client.
Much like with the cluster, we need to adapt the inbound SBE messages from the cluster:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
|
The onMessage
method is only called if the egress is polled using clusterClient.pollEgress()
, for example here it is being called within an offer:
1 2 3 4 5 6 7 |
|
You can also call clusterClient.pollEgress()
in your Duty Cycle.
Running the Sample¶
Warning
With JDK 17+, you must run both the client and cluster with the following jvm argument: --add-opens=java.base/sun.nio.ch=ALL-UNNAMED
Steps to run the samples:
- in the
aeron-cookbook-code
root folder, run the gradle build with./gradlew
. - build the executable single jars for the Rsm Cluster and Rsm Client using
./gradlew uberRsmCluster uberRsmClient
- in one terminal window, move into the
build/libs
folder underaeron-cookbook-code/cluster-rsm
root folder - run the cluster with
java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar cluster-rsm-uber-cluster.jar
- in another terminal window, move into the
build/libs
folder underaeron-cookbook-code/cluster-rsm
root folder - run the client with
java --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -jar cluster-rsm-uber-client.jar
Both applications can be exited using Ctrl+C
.
First run of cluster¶
Cluster directory is /.../.../.../aeron-cluster-0/cluster
10:50:19.893 [clustered-service] Cluster Node is in role LEADER
Client connects¶
Sample client log:
10:53:44.929 [main] Starting
10:53:44.931 [main] Adding 1; correlation = 1
10:53:44.934 [main] Adding 2; correlation = 2
10:53:44.936 [main] Adding 3; correlation = 3
10:53:44.937 [main] Adding 4; correlation = 4
10:53:44.939 [main] Adding 5; correlation = 5
10:53:44.940 [main] Adding 6; correlation = 6
10:53:44.943 [main] Current value is 1; correlation = 1
10:53:44.943 [main] Current value is 3; correlation = 2
10:53:44.943 [main] Current value is 6; correlation = 3
10:53:44.943 [main] Current value is 10; correlation = 4
10:53:44.943 [main] Current value is 15; correlation = 5
10:53:44.944 [main] Current value is 21; correlation = 6
...
10:53:45.027 [main] Adding 96; correlation = 96
10:53:45.028 [main] Adding 97; correlation = 97
10:53:45.029 [main] Adding 98; correlation = 98
10:53:45.030 [main] Current value is 7725; correlation = 96
10:53:45.030 [main] Multiplying by 2; correlation = 99
10:53:45.031 [main] Adding 100; correlation = 100
10:53:45.031 [main] Current value is 7822; correlation = 97
10:53:45.031 [main] Current value is 7920; correlation = 98
10:53:45.031 [main] Current value is 15840; correlation = 99
10:53:45.038 [main] Current value is 15940; correlation = 100
10:53:45.038 [main] Done
Sample cluster log:
10:53:44.938 [clustered-service] adding 1, value is now 1; correlation = 1
10:53:44.939 [clustered-service] adding 2, value is now 3; correlation = 2
10:53:44.939 [clustered-service] adding 3, value is now 6; correlation = 3
10:53:44.939 [clustered-service] adding 4, value is now 10; correlation = 4
10:53:44.940 [clustered-service] adding 5, value is now 15; correlation = 5
10:53:44.941 [clustered-service] adding 6, value is now 21; correlation = 6
...
10:53:45.029 [clustered-service] adding 96, value is now 7725; correlation = 96
10:53:45.030 [clustered-service] adding 97, value is now 7822; correlation = 97
10:53:45.030 [clustered-service] adding 98, value is now 7920; correlation = 98
10:53:45.030 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:53:45.035 [clustered-service] adding 100, value is now 15940; correlation = 100
10:53:50.038 [clustered-service] Cluster Client Session closed
Restarting the cluster¶
In the sample, the cluster has deleteDirOnStart
set to false which means that restarting the cluster will result in the messages already sent in being replayed, and the cluster state is restored to the value in the first run:
10:58:03.682 [clustered-service] Cluster Client Session opened
10:58:03.685 [clustered-service] adding 1, value is now 1; correlation = 1
10:58:03.685 [clustered-service] adding 2, value is now 3; correlation = 2
...
10:58:03.685 [clustered-service] adding 98, value is now 7920; correlation = 98
10:58:03.685 [clustered-service] multiplying by 2, value is now 15840; correlation = 99
10:58:03.685 [clustered-service] adding 100, value is now 15940; correlation = 100
10:58:03.685 [clustered-service] Cluster Client Session closed
10:58:03.685 [clustered-service] Cluster Node is in role LEADER
Then, when we connect the client, the existing value in the state machine is used:
11:03:43.150 [main] Starting
11:03:43.152 [main] Adding 1; correlation = 1
11:03:43.155 [main] Adding 2; correlation = 2
11:03:43.157 [main] Adding 3; correlation = 3
11:03:43.158 [main] Adding 4; correlation = 4
11:03:43.160 [main] Current value is 15941; correlation = 1
11:03:43.160 [main] Current value is 15943; correlation = 2
11:03:43.160 [main] Current value is 15946; correlation = 3
11:03:43.160 [main] Current value is 15950; correlation = 4
...
11:03:43.249 [main] Adding 96; correlation = 96
11:03:43.249 [main] Adding 97; correlation = 97
11:03:43.250 [main] Adding 98; correlation = 98
11:03:43.251 [main] Multiplying by 2; correlation = 99
11:03:43.252 [main] Current value is 71485; correlation = 96
11:03:43.252 [main] Current value is 71582; correlation = 97
11:03:43.252 [main] Adding 100; correlation = 100
11:03:43.252 [main] Current value is 71680; correlation = 98
11:03:43.254 [main] Current value is 143360; correlation = 99
11:03:43.254 [main] Current value is 143460; correlation = 100
11:03:43.254 [main] Done
The cluster logs then show:
11:03:43.139 [clustered-service] Cluster Client Session opened
11:03:43.157 [clustered-service] adding 1, value is now 15941; correlation = 1
11:03:43.158 [clustered-service] adding 2, value is now 15943; correlation = 2
11:03:43.158 [clustered-service] adding 3, value is now 15946; correlation = 3
11:03:43.159 [clustered-service] adding 4, value is now 15950; correlation = 4
...
11:03:43.250 [clustered-service] adding 96, value is now 71485; correlation = 96
11:03:43.250 [clustered-service] adding 97, value is now 71582; correlation = 97
11:03:43.251 [clustered-service] adding 98, value is now 71680; correlation = 98
11:03:43.253 [clustered-service] multiplying by 2, value is now 143360; correlation = 99
11:03:43.253 [clustered-service] adding 100, value is now 143460; correlation = 100
11:03:48.255 [clustered-service] Cluster Client Session closed
Using Cluster Tool to take snapshots¶
For information on cluster tool, see Operating Aeron Cluster.
Note
- This assumes that you have downloaded the latest Aeron jar, and you have moved into the correct directory (as logged by the cluster process on start) holding
cluster-mark.dat
.
Once we request a snapshot with the cluster tool, using the command:
java --add-opens java.base/jdk.internal.misc=ALL-UNNAMED --add-opens java.base/java.util.zip=ALL-UNNAMED -cp aeron-all-*.jar io.aeron.cluster.ClusterTool . snapshot
...
INFO: Mark file exists: ./cluster-mark.dat
Member [0]: SNAPSHOT applied successfully
Once executed, the cluster log will show:
11:04:08.710 [clustered-service] taking snapshot
11:04:08.711 [clustered-service] taking snapshot with current value at 143460
This will write the snapshot state, and truncate the cluster log. When restarted, the cluster log then shows the snapshot load:
11:05:44.489 [clustered-service] loading snapshot
11:05:44.489 [clustered-service] reading snapshot with current value at 143460
11:05:44.512 [clustered-service] Cluster Node is in role LEADER