Building the Server
Building the server is done in 3 steps:
- The server process
- The server agent
- The server adapter
Server Process
The main
method of the Server process is below. The key parts are:
- Create a ShutdownSignalBarrier, which is used to close the process once the sample has completed (line 1)
- Create a Media Driver. Note a server specific folder is used to ensure the client and server do not share the media driver folder. (lines 5-10)
- Create the Aeron API (lines 13-15)
- Construct the Server agent and start it running on a thread (lines 18-21)
- Await the shutdown signal barrier to be signalled (line 24)
- Close the Server Agent, Aeron and Media Driver (lines 26-28)
See GitHub for the complete source.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | final IdleStrategy idleStrategy = new SleepingMillisIdleStrategy();
final ShutdownSignalBarrier barrier = new ShutdownSignalBarrier();
//construct Media Driver, cleaning up media driver folder on start/stop
final MediaDriver.Context mediaDriverCtx = new MediaDriver.Context()
.aeronDirectoryName(CommonContext.getAeronDirectoryName() + "-server")
.dirDeleteOnStart(true)
.dirDeleteOnShutdown(true)
.threadingMode(ThreadingMode.SHARED);
final MediaDriver mediaDriver = MediaDriver.launchEmbedded(mediaDriverCtx);
//construct Aeron, pointing at the media driver's folder
final Aeron.Context aeronCtx = new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName());
final Aeron aeron = Aeron.connect(aeronCtx);
//Construct the server agent
ServerAgent serverAgent = new ServerAgent(aeron, barrier);
AgentRunner serverAgentRunner = new AgentRunner(idleStrategy,
Throwable::printStackTrace, null, serverAgent);
AgentRunner.startOnThread(serverAgentRunner);
//Await shutdown signal
barrier.await();
CloseHelper.quietClose(serverAgentRunner);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);
|
Server Agent
In the server, the primary agent is very simple - all it does in doWork
is poll the subscription, and in onStart
it constructs the server's subscription. See GitHub for the complete source.
1
2
3
4
5
6
7
8
9
10
11
12
13 | @Override
public void onStart()
{
log.info("Server starting");
subscription = aeron.addSubscription(Constants.SERVER_URI,
Constants.RPC_STREAM);
}
@Override
public int doWork() throws Exception
{
return subscription.poll(serverAdapter, 1);
}
|
The adapter contains the majority of logic for the server.
Server Adapter
There are three key methods within the serverAdapter:
- The
onFragment
method, which is called from the subscription polling in the Agent above
- The
blockingOpenConnection
method, which opens the outbound connection to the client so that the response can be sent.
- The
respond
method, which takes the input parameter and makes it uppercase
The full source code is on GitHub
onFragment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27 | public void onFragment(DirectBuffer buffer, int offset, int length, Header header)
{
headerDecoder.wrap(buffer, offset);
final int headerLength = headerDecoder.encodedLength();
final int actingLength = headerDecoder.blockLength();
final int actingVersion = headerDecoder.version();
switch (headerDecoder.templateId())
{
case RpcConnectRequestDecoder.TEMPLATE_ID:
connectRequest.wrap(buffer, offset + headerLength,
actingLength, actingVersion);
final int streamId = connectRequest.returnConnectStream();
final String uri = connectRequest.returnConnectUri();
blockingOpenConnection(streamId, uri);
break;
case RpcRequestMethodDecoder.TEMPLATE_ID:
requestMethod.wrap(buffer, offset + headerLength,
actingLength, actingVersion);
final String parameters = requestMethod.parameters();
final String correlation = requestMethod.correlation();
respond(parameters, correlation);
break;
default:
break;
}
}
|
Key lines are as follows:
- Lines 11-16, which accept the Connect Request from the client, and invoke the blockingOpenConnection
- Lines 18-23, which accept the inbound RequestMethod and responds.
blockingOpenConnection
| private void blockingOpenConnection(int streamId, String uri)
{
log.info("received connect request with response URI {} stream {}",
uri, streamId);
publication = aeron.addExclusivePublication(uri, streamId);
while (!publication.isConnected())
{
aeron.context().idleStrategy().idle();
}
}
|
Key lines are as follows:
- Line 5 which opens an exclusive publication to the client, using the values provided in the
ConnectRequest
- Lines 6-9 which blocks the process until the publication is connected.
respond
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | private void respond(String parameters, String correlation)
{
final String returnValue = parameters.toUpperCase();
log.info("responding on correlation {} with value {}", correlation, returnValue);
responseEvent.wrapAndApplyHeader(buffer, 0, headerEncoder);
responseEvent.result(returnValue);
responseEvent.correlation(correlation);
int retries = 3;
do
{
long result = publication.offer(buffer, 0, headerEncoder.encodedLength() + responseEvent.encodedLength());
if (result > 0)
{
//shutdown once the result is sent
barrier.signal();
break;
} else
{
log.warn("aeron returned {}", result);
}
}
while (--retries > 0);
}
|
Key lines are as follows:
- Lines 3-8, which prepares the response data, keeping the correlation
- Line 13, which attempts the offer to the publication (lines 12-25 wrap this in 3 retries).
- Line 17, which signals the shutdown signal barrier that the process can terminate
Extending for multiple clients
Adding support for multiple clients can be done as follows:
- Firstly, the process will need some other condition to exit.
- When a connect request are received, take the
header.sessionId
and associate it with the constructed publication in an Int2ObjectHashMap
or similar
- When method requests come in, lookup the correct publication to use using the
header.sessionId
.
- Be careful to not leak publications or subscriptions as this can result in errors (for example, Aeron counters run out of capacity around 8192 values)