Concurrent Collections
Ring Buffers¶
OneToOneRingBuffer¶
When you've built a process which uses multiple threads, have a single producer and a single consumer for a specific flow, and don't want to deal with the complexity of Aeron IPC, then Agrona's OneToOneRingBuffer
is a good option.
Two samples using OneToOneRingBuffer
are on GitHub:
Storage for this ring buffer is defined upfront, and cannot be resized. In the sample below, the underlying buffer is allocated off heap, and is set to accept 4096 bytes of content. The addition of the RingBufferDescriptor.TRAILER_LENGTH
is required so that the data supporting the ring buffer is held within the same underlying buffer. Note that any data written to the ring buffer via write
has an additional 8 byte header written with it, so be sure to keep this overhead in mind when sizing the underlying buffer.
1 2 3 4 5 |
|
Consuming the data is done via an implementation of the agrona MessageHandler
interface, for example:
1 2 3 4 5 6 7 8 9 |
|
You'll notice that this is a very similar interface to the Aeron FragmentHandler. This makes moving from this collection object to Aeron relatively simple. The msgTypeId
is used to identify the message within the header. If you do not use this field, it has to be set to a value > 0.
Sending data is very simple, but does not provide the same level of information as Aeron's Publication. An example:
1 2 3 4 5 6 |
|
The value returned by the write
method indicates if the write failed. If the value is false
, then the write failed. This could be treated much like back pressure in Aeron Publications - if appropriate, give the consumer time to read some data, and retry the write.
The ring buffer can be monitored for both producer and consumer progress via methods the ring buffer object.
1 2 3 4 5 |
|
Using this object between Agrona agents is simple - just share the OneToOneRingBuffer
between the two agents, and use the write
method as you would an Aeron Publication
and poll using the read
method in the consumer agent's duty cycle.
Controlled Message Handler¶
Agrona 1.9.0 adds in the Controlled Message Handler, which operates much like the version found Aeron's Image object.
1 2 3 4 5 6 7 8 9 |
|
The difference is in the return value of type ControlledMessageHandler.Action
- this allows control over what happens with the message. There are 4 options:
- ABORT: This aborts the read operation for this message. It will be delivered again on next read
- BREAK: This stops further processing after the current message for this read.
- COMMIT: Continues processing, but commits at the current message.
- CONTINUE: Continues processing, committing at the end of the current batch (this is equivalent to the standard handler).
TryClaim¶
Also new in 1.9.0 is tryClaim
, which allows writing directly to the ring buffer. This gives the ring buffer zero copy semantics.
int claimIndex = ringBuffer.tryClaim(1, Integer.BYTES);
if (claimIndex > 0)
{
final AtomicBuffer buffer = ringBuffer.buffer();
buffer.putInt(claimIndex, something);
ringBuffer.commit(claimIndex);
}
return 0;
TryClaim operates similarly to how it does in Aeron:
- First, call
tryClaim
to get an index for a pre-defined length. If that index is > 0, you can continue and use the index as the offset in thebuffer.put
- Next, get the ring buffer's underlying buffer (if you don't already have it)
- Next, write the data
- Finally,
commit
(orabort
). Note that there is no unblock timeout like in Aeron - ifcommit
(orabort
) is not called, then the consumer will not be able to consume past the point of the first uncommitted/non aborted claim.
See Send Agent example of OneToOneRingBuffer for an example.
ManyToOneRingBuffer¶
The ManyToOneRingBuffer
is used exactly like the OneToOneRingBuffer
, but allows for multiple producers and a single consumer.
See Three Agent example of ManyToOneRingBuffer
Broadcast¶
The OneToOneRingBuffer
and ManyToOneRingBuffer
implementations allow you to have one or many producers, but only a single consumer. Sometimes you might need one producer and one to many consumers - Agrona offers a BroadcastTransmitter
and BroadcastReceiver
for this.
Warning
BroadcastTransmitter
and BroadcastReceiver
will drop messages if the sender is producing faster than consumers can consume. There is no back pressure support - if this is required, rather consider Aeron for one to many transmission tasks.
All communication is managed via single shared AtomicBuffer
. See GitHub for an example.
Sending¶
Sending is performed using a BroadcastTransmitter
. A buffer must be prepared, and then handed over to the BroadcastTransmitter
to transmit.
private final BroadcastTransmitter transmitter;
private final MutableDirectBuffer msgBuffer = new ExpandableArrayBuffer();
public SendAgent(final AtomicBuffer buffer...)
{
this.transmitter = new BroadcastTransmitter(buffer);
}
@Override
public int doWork()
{
...
msgBuffer.putInt(0, lastSend);
transmitter.transmit(1, msgBuffer, 0, Integer.BYTES);
...
lastSend++;
}
Receiving¶
Receiving is performed using a CopyBroadcastReceiver
. This makes it simpler to receive messages and allows receipt of messages via a MessageHandler
interface.
public class ReceiveAgent implements Agent, MessageHandler
{
...
private final BroadcastReceiver broadcastReceiver;
private final CopyBroadcastReceiver copyBroadcastReceiver;
public ReceiveAgent(final AtomicBuffer atomicBuffer, final String name)
{
this.broadcastReceiver = new BroadcastReceiver(atomicBuffer);
this.copyBroadcastReceiver = new CopyBroadcastReceiver(broadcastReceiver);
...
}
@Override
public int doWork()
{
copyBroadcastReceiver.receive(this::onMessage);
return 0;
}
@Override
public void onMessage(int msgTypeId, MutableDirectBuffer buffer, int index, int length)
{
LOGGER.info("Received {}", buffer.getInt(index));
}
...
}
If the receiver is running behind, then the following exception is raised:
java.lang.IllegalStateException: unable to keep up with broadcast