Multi-host Sample¶
A complete sample which sends data from a host on one Docker container to another in a different Docker container. This forces different IP addresses and requires correct wiring of both the host and client.
The code is on GitHub.
Overview¶
%%{init: {'theme': 'base', 'themeVariables': { 'fontFamily': '-apple-system, BlinkMacSystemFont, Roboto, Oxygen-Sans, Ubuntu, Cantarell, sans-serif'}}}%%
graph LR;
Host[Archive Host]-- replay -->Client[Archive Client];
The sample operates as follows:
- at start of the Archive Host, it constructs an Archiving Media Driver and Aeron client. This Archiving Media Driver listens on two ports: 17000 and 17001 - this is for the Control Request and Recording Events channel respectively.
- with the Aeron object ready, the Archive Host constructs a spied
aeron-ipc
subscription on stream 100 - at the start of the Archive Client, it constructs a normal Media Driver and Aeron client.
- the Archive Client then creates and connects an instance of
AeronArchive
to the Archive Host. It will wait until the Archive Host is ready. - with a connected
AeronArchive
, the Archive Client asks the remote archive to list its recordings and finds the correct one matchingaeron-ipc
subscription on stream 100. It then creates a local subscription and requests the remote Archive Host to begin a replay to it.
Building the Archive Host¶
The most important part of the Archive Host is the setup of the Archiving Media Driver and the starting of the archive recording.
Constructing ArchivingMediaDriver¶
There are two key channels configured - the control channel and recording events channel.
These must bind to the Archive Host's IP address - in the Docker container the value for host
is 10.1.0.2
, and the controlChannelPort
is 17000
and the recordingEventsPort
is 17001
. These values come from the docker-compose.yml file.
Also of note, see line 17 which tells the Media Driver that spies simulate connections. This allows the aeron:ipc
stream to be published to without an explicit subscription.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
This code is in ArchiveHostAgent.java on GitHub.
Starting a recording¶
To start a recording, we need to connect an instance of AeronArchive
to the ArchivingMediaDriver
using the same host and port information above. The responses will flow over controlResponseChannel
, which is set to use an ephemeral port over UDP in the sample.
With the AeronArchive
connected, a local publication to aeron:ipc
on stream 100 is created.
The archive is then requested to start the recording on the local ArchivingMediaDriver for the given aeron:ipc
stream.
Since this is an asynchronous operation, we then keep track of the recording getting started by observing the Aeron counters.
Next, the recording ID is captured and logged and finally, the activity and events listeners are wired up.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
|
Building a client¶
The client makes use of a standard Media Driver and Aeron client, without any specific configuration. The interesting part comes once the Aeron client is connected.
Connecting to the remote Aeron Archive¶
The example code runs the connect to remote Aeron Archive code within the duty cycle of the agent, and it is expected that it would be called many times during operation.
First, the code gets hold of an AsyncConnect
object, which is used for asynchronous connections to an ArchivingMediaDriver. Note that the controlRequestChannel
and recordingEventsChannel
both point to the Archive Host, and use the ports defined by the host.
The control response channel (to which the remote ArchivingMediaDriver sends responses) this time however is for this machine. This AsyncConnect
is expected to be polled within a duty cycle, however, if the remote host is not available before timeout, it will result in a TimeoutException
being thrown.
Once the AsyncConnect
object returns a non-null AeronArchive
, the code can proceed. The code then calls getRecordingId
, which requests the remote ArchivingMediaDriver to list all recordings for the given uri (aeron:ipc
in this case) and stream (100 in this case).
With the recording Id known, the code then constructs a local UDP subscription on an ephemeral port, and requests the remote ArchivingMediaDriver to replay the recorded stream to it, from position 0 to the end of the stream.
With this all in place, the local subscription can then be polled as normal.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
|
The full code for this is found in ArchiveClientAgent.java on GitHub.
Running the sample¶
Note
It is assumed the reader is familiar with Docker and has Docker installed.
- First, you will need a recent version of Docker.
- Dedicate at least 4GB of memory and 4 to 8 cores to Docker.
- You will need access to docker hub to download the
azul/zulu-openjdk-debian:17
container. - Then, you will need to build the shadow jars for the archive client and archive host. To do this, run
gradle
in the sample source code top level folder (typicallyaeron-cookbook-code
) with no arguments. - Finally, move to the
archive-multi-host
folder, and rundocker-compose up -d
.
Execution Output¶
Info
This log output is from running docker-compose up
. After the first few messages were sent/received, the Docker containers were killed.
Inspecting the host¶
If you ran the sample with the -d
option, you can now enter one of the running containers and take a look at the running process. First, open a bash terminal:
docker exec -it archive-multi-host-archive-host-1 bash
Once inside the container, AeronStat can be launched with:
java -cp /root/aeron/aeron-all-*.jar io.aeron.samples.AeronStat watch=false
Sample output (notice the archive specific ports & Archive Control Sessions):
13:03:51 - Aeron Stat (CnC v0.2.0), pid 996, heartbeat age 231ms
======================================================================
0: 112,032 - Bytes sent
1: 83,168 - Bytes received
2: 0 - Failed offers to ReceiverProxy
3: 0 - Failed offers to SenderProxy
4: 0 - Failed offers to DriverConductorProxy
5: 0 - NAKs sent
6: 0 - NAKs received
7: 1,289 - Status Messages sent
8: 1,716 - Status Messages received
9: 3,378 - Heartbeats sent
10: 2,566 - Heartbeats received
11: 0 - Retransmits sent
12: 0 - Flow control under runs
13: 0 - Flow control over runs
14: 0 - Invalid packets
15: 0 - Errors
16: 0 - Short sends
17: 0 - Failed attempts to free log buffers
18: 0 - Sender flow control limits, i.e. back-pressure events
19: 0 - Unblocked Publications
20: 0 - Unblocked Control Commands
21: 0 - Possible TTL Asymmetry
22: 0 - ControllableIdleStrategy status
23: 0 - Loss gap fills
24: 0 - Client liveness timeouts
25: 0 - Resolution changes: driverName=null hostname=archive-host
26: 102,617,067 - Conductor max cycle time doing its work in ns: SHARED
27: 0 - Conductor work cycle exceeded threshold count: threshold=1000000000ns SHARED
28: 102,484,357 - Sender max cycle time doing its work in ns: SHARED
29: 0 - Sender work cycle exceeded threshold count: threshold=1000000000ns SHARED
30: 102,592,566 - Receiver max cycle time doing its work in ns: SHARED
31: 0 - Receiver work cycle exceeded threshold count: threshold=1000000000ns SHARED
32: 131,376 - NameResolver max time in ns
33: 0 - NameResolver exceeded threshold count
34: 1,685,711,031,511 - client-heartbeat: 1
35: 24,008,833 - archive-conductor max cycle time in ns - archiveId=1
36: 0 - archive-conductor work cycle time exceeded count: threshold=1000000000ns - archiveId=1
37: 2 - Archive Control Sessions - archiveId=1
38: 1 - Archive Recording Sessions - archiveId=1
39: 1 - Archive Replay Sessions - archiveId=1
40: 301,795 - archive-recorder max write time in ns - archiveId=1
41: 2,816 - archive-recorder total write bytes - archiveId=1
42: 4,218,882 - archive-recorder total write time in ns - archiveId=1
43: 289,670 - archive-replayer max read time in ns - archiveId=1
44: 2,816 - archive-replayer total read bytes - archiveId=1
45: 3,033,200 - archive-replayer total read time in ns - archiveId=1
46: 1 - rcv-channel: aeron:udp?sparse=true|endpoint=archive-host:17000 10.1.0.2:17000
47: 1 - rcv-local-sockaddr: 46 10.1.0.2:17000
48: 1 - rcv-channel: aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:0 10.1.0.2:34358
49: 1 - rcv-local-sockaddr: 48 10.1.0.2:34358
50: 1,685,711,031,577 - client-heartbeat: 17
51: 1 - snd-channel: aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000 10.1.0.2:39677
52: 1 - snd-local-sockaddr: 51 10.1.0.2:39677
53: 320 - pub-pos (sampled): 19 -1888758186 10 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000
54: 33,088 - pub-lmt: 19 -1888758186 10 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000
55: 320 - snd-pos: 19 -1888758186 10 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000
56: 32,768 - snd-lmt: 19 -1888758186 10 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000
57: 0 - snd-bpe: 19 -1888758186 10 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:17000
58: 320 - sub-pos: 15 -1888758186 10 aeron:udp?sparse=true|endpoint=archive-host:17000 @0
59: 320 - rcv-hwm: 20 -1888758186 10 aeron:udp?sparse=true|endpoint=archive-host:17000
60: 320 - rcv-pos: 20 -1888758186 10 aeron:udp?sparse=true|endpoint=archive-host:17000
61: 1 - snd-channel: aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true 10.1.0.2:38471
62: 1 - snd-local-sockaddr: 61 10.1.0.2:38471
63: 288 - pub-pos (sampled): 22 -1888758185 20 aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true
64: 33,056 - pub-lmt: 22 -1888758185 20 aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true
65: 288 - snd-pos: 22 -1888758185 20 aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true
66: 32,768 - snd-lmt: 22 -1888758185 20 aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true
67: 0 - snd-bpe: 22 -1888758185 20 aeron:udp?endpoint=10.1.0.2:34358|mtu=1408|term-length=65536|sparse=true
68: 288 - sub-pos: 18 -1888758185 20 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:0 @0
69: 288 - rcv-hwm: 23 -1888758185 20 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:0
70: 288 - rcv-pos: 23 -1888758185 20 aeron:udp?term-length=65536|sparse=true|mtu=1408|endpoint=10.1.0.2:0
71: 2,752 - pub-pos (sampled): 24 -1888758184 100 aeron:ipc
72: 33,554,432 - pub-lmt: 24 -1888758184 100 aeron:ipc
73: 448 - sub-pos: 15 -1773238685 10 aeron:udp?sparse=true|endpoint=archive-host:17000 @0
74: 448 - rcv-hwm: 25 -1773238685 10 aeron:udp?sparse=true|endpoint=archive-host:17000
75: 448 - rcv-pos: 25 -1773238685 10 aeron:udp?sparse=true|endpoint=archive-host:17000
76: 2,816 - sub-pos: 27 -1888758184 100 aeron:ipc @0
77: 2,816 - rec-pos: 0 -1888758184 100 aeron:ipc - archiveId=1
78: 1 - rcv-channel: aeron:udp?control-mode=dynamic|control=10.1.0.2:17001 0.0.0.0:42261
79: 1 - rcv-local-sockaddr: 78 0.0.0.0:42261
80: 1 - snd-channel: aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true 10.1.0.2:57152
81: 1 - snd-local-sockaddr: 80 10.1.0.2:57152
82: 608 - pub-pos (sampled): 30 -1888758183 20 aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true
83: 33,376 - pub-lmt: 30 -1888758183 20 aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true
84: 608 - snd-pos: 30 -1888758183 20 aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true
85: 32,768 - snd-lmt: 30 -1888758183 20 aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true
86: 0 - snd-bpe: 30 -1888758183 20 aeron:udp?endpoint=10.1.0.3:48079|mtu=1408|term-length=65536|sparse=true
87: 1 - snd-channel: aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000 10.1.0.2:60762
88: 1 - snd-local-sockaddr: 87 10.1.0.2:60762
89: 2,752 - pub-pos (sampled): 31 -1888758182 200 aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000
90: 33,557,248 - pub-lmt: 31 -1888758182 200 aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000
91: 2,816 - snd-pos: 31 -1888758182 200 aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000
92: 131,072 - snd-lmt: 31 -1888758182 200 aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000
93: 0 - snd-bpe: 31 -1888758182 200 aeron:udp?endpoint=10.1.0.3:60279|mtu=1408|term-length=67108864|init-term-id=1821376038|term-id=1821376038|term-offset=0|linger=5000000000
--
The archive data is written to the folder ~/jar/aeron-archive
. The contents of the archive can be viewed by using the dump
command of ArchiveTool
(further details here).
Sample output is below, showing the 8 byte incrementing long
clearly:
java -cp /root/aeron/aeron-all-*.jar io.aeron.archive.ArchiveTool . dump 4
INFO: Mark file exists: ./archive-mark.dat
13:08:42 (start: 2023-06-02 13:02:25, activity: 2023-06-02 13:08:41)
[MarkFileHeader](sbeTemplateId=200|sbeSchemaId=100|sbeSchemaVersion=1|sbeBlockLength=128):version=196864|activityTimestamp=1685711321814|startTimestamp=1685710945612|pid=7|controlStreamId=10|localControlStreamId=10|eventsStreamId=30|headerLength=8192|errorBufferLength=1048576|controlChannel='aeron:udp?endpoint=archive-host:17000'|localControlChannel='aeron:ipc'|eventsChannel='aeron:udp?control-mode=dynamic|control=archive-host:17001'|aeronDirectory='/dev/shm/aeron-root'
Catalog capacity in bytes: 1048576
Dumping up to 4 fragments per recording
Recording 0
channel: aeron:ipc
streamId: 100
stream length: -1
[RecordingDescriptorHeader](sbeTemplateId=21|sbeSchemaId=101|sbeSchemaVersion=7|sbeBlockLength=32):length=160|state=VALID|checksum=0|reserved=0
[RecordingDescriptor](sbeTemplateId=22|sbeSchemaId=101|sbeSchemaVersion=7|sbeBlockLength=80):controlSessionId=0|correlationId=0|recordingId=0|startTimestamp=1685710945741|stopTimestamp=-1|startPosition=0|stopPosition=-1|initialTermId=1821376038|segmentFileLength=134217728|termBufferLength=67108864|mtuLength=1408|sessionId=-1888758184|streamId=100|strippedChannel='aeron:ipc'|originalChannel='aeron:ipc'|sourceIdentity='aeron:ipc'
Frame at position [0] data at offset [32] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [64] data at offset [96] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 02 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [128] data at offset [160] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 03 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Frame at position [192] data at offset [224] with length = 8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 04 00 00 00 00 00 00 00 |........ |
+--------+-------------------------------------------------+----------------+
Specified frame limit 4 reached. Continue? (y/n): n
The thread structure of the host can be seen by first getting the process PID by using jps
, and then running pidstat
(notice the archive-conductor
thread):
root@archive-host:~/jar# jps
8 archive-host-0.1-SNAPSHOT-all.jar
62 Jps
root@archive-host:~/jar# pidstat -t -p 8
Linux 6.1.29-linuxkit (archive-host) 06/02/2023 _x86_64_ (16 CPU)
11:25:48 UID TGID TID %usr %system %guest %wait %CPU CPU Command
11:25:48 0 7 - 0.33 1.38 0.00 0.00 1.71 4 java
11:25:48 0 - 7 0.00 0.00 0.00 0.00 0.00 4 |__java
11:25:48 0 - 8 0.00 0.00 0.00 0.00 0.01 6 |__java
11:25:48 0 - 9 0.00 0.00 0.00 0.00 0.00 7 |__GC Thread#0
11:25:48 0 - 10 0.00 0.00 0.00 0.00 0.00 5 |__G1 Main Marker
11:25:48 0 - 11 0.00 0.00 0.00 0.00 0.00 0 |__G1 Conc#0
11:25:48 0 - 12 0.00 0.00 0.00 0.00 0.00 2 |__G1 Refine#0
11:25:48 0 - 13 0.00 0.00 0.00 0.00 0.00 5 |__G1 Service
11:25:48 0 - 14 0.00 0.00 0.00 0.00 0.00 1 |__VM Thread
11:25:48 0 - 15 0.00 0.00 0.00 0.00 0.00 1 |__Reference Handl
11:25:48 0 - 16 0.00 0.00 0.00 0.00 0.00 5 |__Finalizer
11:25:48 0 - 17 0.00 0.00 0.00 0.00 0.00 7 |__Signal Dispatch
11:25:48 0 - 18 0.00 0.00 0.00 0.00 0.00 7 |__Service Thread
11:25:48 0 - 19 0.00 0.00 0.00 0.00 0.00 4 |__Monitor Deflati
11:25:48 0 - 20 0.01 0.00 0.00 0.00 0.01 2 |__C2 CompilerThre
11:25:48 0 - 21 0.00 0.00 0.00 0.00 0.01 4 |__C1 CompilerThre
11:25:48 0 - 22 0.00 0.00 0.00 0.00 0.00 5 |__Sweeper thread
11:25:48 0 - 23 0.00 0.00 0.00 0.00 0.00 4 |__Common-Cleaner
11:25:48 0 - 24 0.00 0.00 0.00 0.00 0.00 0 |__Notification Th
11:25:48 0 - 25 0.00 0.01 0.00 0.00 0.01 5 |__VM Periodic Tas
11:25:48 0 - 26 0.11 0.42 0.00 0.10 0.54 0 |__[sender,receive
11:25:48 0 - 27 0.09 0.38 0.00 0.11 0.47 7 |__archive-conduct
11:25:48 0 - 28 0.05 0.29 0.00 0.17 0.33 3 |__aeron-client-co
11:25:48 0 - 29 0.05 0.29 0.00 0.17 0.33 7 |__agent-host