Gateway Patterns¶
Aeron Cluster provides the necessary extension points that are required to build both Active/Passive and Active/Active cluster clients. This capability is typically deployed for gateways, for example with an Artio based FIX gateway. This page provides the outline of a ProcessManager
that can provide this capability to Aeron Cluster clients.
Cluster Messages¶
First, a message will need to be defined in order to register a cluster client. At the minimum, this message needs to include some form of group identifier so that the process manager can understand the process layout required. Additional fields that may be useful include the minimum group size, and the availability pattern.
Shown in SBE, the ProcessManager
messages may look something like this snippet:
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
package="com.aeroncookbook.sbe"
id="688"
version="1"
semanticVersion="0.1"
description="Gateway Pattern examples"
byteOrder="littleEndian">
<types>
<enum name="ProcessAvailabilityEnum" encodingType="int32">
<validValue name="ACTIVE_ACTIVE">1</validValue>
<validValue name="ACTIVE_PASSIVE">2</validValue>
</enum>
<enum name="ProcessManagerState" encodingType="int32">
<validValue name="ACTIVE">1</validValue>
<validValue name="OFFLINE">2</validValue>
</enum>
</types>
<sbe:message name="RegistrationCommand" id="1"
description="Registers a cluster client with the process manager">
<field name="minGroupSize" id="1" type="int32"/>
<field name="groupId" id="2" type="int32"/>
<field name="availability" id="3" type="ProcessAvailabilityEnum"/>
</sbe:message>
<sbe:message name="ProcessManagerStateEvent" id="2"
description="Current state for a given process">
<field name="state" id="1" type="ProcessManagerState"/>
</sbe:message>
</sbe:messageSchema>
Once the cluster client has connected to the cluster, it should submit the RegistrationCommand
message to the cluster. It is assumed that all processes within the same process group are identical, and all submit the same parameters to the RegistrationCommand
.
Cluster Logic¶
With a RegistrationCommand
message now coming in, the cluster has what's needed to build the required ProcessManager
state.
Add an object that processes Registration messages. This will keep track of all the registration messages, and their associated ClientSession
.
//within a ClusteredService
@Override
public void onSessionMessage(ClientSession session, long timestamp, DirectBuffer buffer, int offset,
int length, Header header)
{
//adapt the messages, accept the registration message
final RegistrationCommand decodedRegistationCommand = ...
processManager.registerClient(session, decodedRegistationCommand);
}
Within the registerClient
, capture the session and define the logic required. For example:
- if it is the first connect for a given group, and if the process group is Active/Active, and there is no minimum group size, then the
ProcessManager
can simply respond with aProcessManagerStateEvent
with a state set toACTIVE
- if it is the first connect for a given group, and if the process group is Active/Passive, and there a group size of two, then again the
ProcessManager
can simply respond with aProcessManagerStateEvent
with a state set toACTIVE
- if it is the second connect for a given group, and if the process group is Active/Passive, and there a group size of two, then again the
ProcessManager
can simply respond with aProcessManagerStateEvent
with a state set toOFFLINE
- if it is the first connect for a given group, and if the process group is Active/Active, and there is a minimum group size of 4, then the
ProcessManager
can simply respond with aProcessManagerStateEvent
with a state set toOFFLINE
. A cluster timer could be started to allow the system to verify process group registrations within a given timeframe. Once enough processes within the group join, then theProcessManagerStateEvent
could be sent to all of the processes within the group and be set toACTIVE
.
The cluster clients themselves may need to perform specific logic depending on if they are ACTIVE
or OFFLINE
. For example, if a FIX gateway within an Active/Passive pair is set to OFFLINE
, it would not allow client connectivity.
For the ProcessManager
to be able to communicate with specific cluster clients, it must use the ClientSession
. This makes it possible to send a message directly to a specific session even if this specific ClientSession
was not the one that sent in a message.
private void sendToSession(ClientSession clientSession, DirectBuffer buffer, int offset, int length)
{
clientSession.offer(buffer, offset, length);
}
When other components in the cluster wish to communicate with a given process group, two methods on the ProcessManager
could be offered. One to send to any Active cluster clients in the group, and another to broadcast to all cluster clients within the group.
public void sendToGroup(int processGroup, DirectBuffer buffer, int offset, int length)
{
//lookup the active node(s) within the given process group, get the ClientSession(s) and offer
}
public void broadcastToGroup(int processGroup, DirectBuffer buffer, int offset, int length)
{
//get all the ClientSession(s) for the given process group, and offer to all
}
To properly manage the removal of cluster clients, the ProcessManager
will need to listen on onSessionClose
:
//within a ClusteredService
@Override
public void onSessionClose(ClientSession session, long timestamp, CloseReason closeReason)
{
...
processManager.closeSession(session);
}
When a cluster client is removed, the existing rules will need to be reviewed. For example, the PASSIVE
node of a two node ACTIVE_PASSIVE
group could be sent a ProcessManagerStateEvent
with the state now set to ACTIVE
, if it was the ACTIVE
node in the process group that failed/disconnected.
Complexities with Active/Active clients¶
In some scenarios, you may need Active/Active client gateways to process messages in parallel in order to offer zero downtime cluster client failover. The outbound flow is simple, and is as above. The inbound flow typically requires deduplication, with the first response to a specific message from nodes in the process group being accepted, and all others as submitted by other Active nodes within the same process group ignored.
The following rules will need to be strictly adhered to:
- the gateway process must be fully deterministic. This means the code cannot use threads, there can no external I/O, and the logic itself must be deterministic. Messages must be processed and responded to sequentially.
- then, either a monotonically incrementing sequence will need to be created within the gateways (they should all start from the same number), or a cluster assigned monotonically incrementing sequence request sequence would need to be stamped on the outbound message.
When the active/active gateways process a message, they will all need to submit identical messages to the cluster. Since they are deterministic, they will all respond with the same sequence. The receiving logic can then keep track of the last received sequence, and simply reject any message that does not increment the last received value by one.