Data Retention Regulator

Overview

The Data Retention Regulator is a tool for managing the lifetime of data in Aeron Cluster and Aeron Archive to help avoid running out of disk space.

How to get the Data Retention Regulator

The Data Retention Regulator is a Premium Aeron component. The binaries are accessible from the Adaptive Artifactory.

<groupId>io.aeron.premium.drr</groupId>
<artifactId>aeron-data-retention-regulator</artifactId>

Usage

The tool can be run from the command line or programmatically.

CLI usage

java \
    --add-opens java.base/jdk.internal.misc=ALL-UNNAMED \
    -Daeron.archive.dir=/path/to/archive/dir \
    -jar /path/to/aeron-data-retention-regulator-all.jar \
    a.properties b.properties

where a.properties and b.properties are files containing system property values to configure the regulator’s policies.

Note that system property values take precedence over properties in the files, and property values in "later'' files take precedence over those in "earlier'' files, e.g., b.properties takes precedence over a.properties and -Dfoo.bar=baz takes precedence over both.

Programmatic usage

final DataRetentionRegulator.Context context = new DataRetentionRegulator.Context()
    .archiveDir(archiveDir)
    .rootPolicy(policy);

try (DataRetentionRegulator regulator = new DataRetentionRegulator(context))
{
    regulator.execute();
}

where policy is an instance of DataRetentionPolicy.

Example policies

The data retention regulator uses a root policy to determine what to do with data. Below are some examples of policies that can be used.

Keeping N snapshots worth of data (incl. log) for recovery

The RetainEnoughDataForRecoveryPolicy policy keeps the last N snapshots and the log following the earliest of these snapshots available for cluster node recovery.

  • It will detach and delete the log segments prior to the earliest snapshot.

  • It will also delete any snapshot recordings that are no longer needed.

  • It will only consider valid snapshots.

  • Until the reocording.log is available, it will specify that all data of all recordings should be retained so that other policies do not "accidentally" delete log data.

  • When the standby directory and cluster identifier is provided, it assumes a TransitionModule is running, and it will determine the snapshots to keep based on the snapshots available in the active recording.log, i.e., from the consensus module’s recording.log when in the CONSENSUS state and from the standby’s recording.log when in the STANDBY state.

  • To run against a ClusterStandby node, without a TransitionModule, the clusterDirName (or cluster.dir) policy property should be set to the standby directory.

It can be configured via properties or programmatically.

When not set via an explicit policy property or programmatically:

  • clusterId will fall back to the system property value aeron.cluster.id,

  • clusterDirName will fall back to the system property value aeron.cluster.dir, and

  • standbyDirName will fall back to the system property value aeron.cluster.standby.dir.

For example, to retain at least 5 snapshots worth of data for recovery.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.cluster.RetainEnoughDataForRecoveryPolicy
# The number of snapshots to keep
aeron.data.retention.policies.myPolicy.snapshot.count=5
# Whether to include snapshots from standby nodes in calculations
aeron.data.retention.policies.myPolicy.include.standby.snapshots=true
# The consensus module directory
aeron.data.retention.policies.myPolicy.cluster.dir=/path/to/consensus
# The (optional) cluster standby directory and cluster id (when running on against a node using a TransitionModule)
aeron.data.retention.policies.myPolicy.standby.dir=/path/to/standby
aeron.data.retention.policies.myPolicy.cluster.id=1
final RetainEnoughDataForRecoveryPolicy.Context context = new RetainEnoughDataForRecoveryPolicy.Context()
    .snapshotCount(5)
    .includeStandbySnapshotsInCalcuations(true)
    .clusterDirName("/path/to/consensus")
    .standbyDirName("/path/to/standby")
    .clusterId(1);
final DataRetentionPolicy policy = new RetainEnoughDataForRecoveryPolicy(context);

Keeping N bytes of data per recording

The RetainLengthPolicy policy keeps at least the last N bytes of data. It will detach and delete recording segments before the last N bytes.

It can be configured via properties or programmatically.

For example, to retain at least 10 GB of data per recording.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
# The number of bytes to keep
aeron.data.retention.policies.myPolicy.length=10g
final RetainLengthPolicy.Context context = new RetainLengthPolicy.Context()
    .length(10 * 1024 * 1024 * 1024L);
final DataRetentionPolicy policy = new RetainLengthPolicy(context);

Applying policies to particular recordings

The MatchingStreamsPolicy policy applies an inner policy to recordings that match a channel URI and/or stream identifier.

  • It will only apply the inner policy to recordings that match the channel URI predicate and/or stream ID.

  • When a stream identifier is provided, it will only match recordings that have that stream identifier.

  • When a channel URI predicate is provided, it will only match recordings with original channel URIs that pass the predicate.

  • When both stream identifier and channel URI predicate are provided, it will only match recordings that match both.

  • When using properties, the original.channel.fragment accepts a regular expression pattern to match part of the original channel URI.

It can be configured via properties or programmatically.

For example, to retain at least 10 GB of data per recording for recordings with the channel URI fragment alias=foo.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.MatchingStreamsPolicy
# The channel URI fragment to match
aeron.data.retention.policies.myPolicy.original.channel.fragment=alias\=foo(\\||$)
# The stream ID to match
aeron.data.retention.policies.myPolicy.stream.id=1
# The inner policy
aeron.data.retention.policies.myPolicy.inner.policy=myInnerPolicy
# For example:
aeron.data.retention.policies.myInnerPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.myInnerPolicy.length=10g
final DataRetentionPolicy innerPolicy = someInnerPolicy();
final MatchingStreamsPolicy.Context context = new MatchingStreamsPolicy.Context()
    .originalChannelFilter(Pattern.compile("alias=foo(\\||$)").asPredicate())
    .streamId(1)
    .delegate(innerPolicy);
final DataRetentionPolicy policy = new MatchingStreamsPolicy(context);

Applying policies to the Consensus Module’s log recording

The MatchingLogRecordingPolicy policy applies an inner policy to the Consensus Module’s log recording.

  • Until the recording.log is available, it will specify that all data of all recordings should be retained so that other policies do not "accidentally" delete log data.

  • When the standby directory and cluster identifier is provided, it assumes a TransitionModule is running, and it will determine the recording for the log based on the active recording.log, i.e., from the consensus module’s recording.log when in the CONSENSUS state and from the standby’s recording.log when in the STANDBY state. Normally, these recordings are the same.

  • To run against a ClusterStandby node, without a TransitionModule, the clusterDirName (or cluster.dir) policy property should be set to the standby directory.

It can be configured via properties or programmatically.

When not set via an explicit policy property or programmatically:

  • clusterId will fall back to the system property value aeron.cluster.id,

  • clusterDirName will fall back to the system property value aeron.cluster.dir, and

  • standbyDirName will fall back to the system property value aeron.cluster.standby.dir.

For example, to retain 10 GB of log recording.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.cluster.MatchingRecordingLogPolicy
# The consensus module directory
aeron.data.retention.policies.myPolicy.cluster.dir=/path/to/consensus
# The (optional) cluster standby directory and cluster id (when running on against a node using a TransitionModule)
aeron.data.retention.policies.myPolicy.standby.dir=/path/to/standby
aeron.data.retention.policies.myPolicy.cluster.id=1
# The inner policy
aeron.data.retention.policies.myPolicy.inner.policy=retainLengthPolicy
# For example:
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy = somePolicy();
final MatchingRecordingLogPolicy.Context context = new MatchingRecordingLogPolicy.Context()
    .clusterDirName("/path/to/consensus")
    .standbyDirName("/path/to/standby")
    .clusterId(1)
    .delegate(innerPolicy);
final DataRetentionPolicy policy = new MatchingRecordingLogPolicy(context);

Retaining the maximum of multiple policies

The RetainMaxOfPolicy policy safely combines policies, e.g., to retain X snapshots worth of cluster data and Y bytes of data per recording, by retaining the maximum of its inner policies.

It can be configured via properties or programmatically.

For example, to retain at least 5 snapshots worth of data for recovery and at least 10 GB of data per recording.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.RetainMaxOfPolicy
# The inner policies
aeron.data.retention.policies.myPolicy.inner.policies=retainClusterDataPolicy,retainLengthPolicy
# For example:
aeron.data.retention.policies.retainClusterDataPolicy.class=io.aeron.data.retention.regulator.policies.cluster.RetainEnoughDataForRecoveryPolicy
aeron.data.retention.policies.retainClusterDataPolicy.snapshot.count=5
aeron.data.retention.policies.retainClusterDataPolicy.include.standby.snapshots=true
aeron.data.retention.policies.retainClusterDataPolicy.cluster.dir=/path/to/consensus
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy1 = somePolicy();
final DataRetentionPolicy innerPolicy2 = someOtherPolicy();
final RetainMaxOfPolicy.Context context = new RetainMaxOfPolicy.Context()
    .policies(innerPolicy1, innerPolicy2);
final DataRetentionPolicy policy = new RetainMaxOfPolicy(context);

Purging recordings that stopped some time ago

The DeleteRecordingsAfterPeriodPolicy policy deletes recordings that stopped some period of time ago. Note that, unlike RetainLengthPolicy, this policy does not specify any retention. It will only specify recordings to delete. Therefore, when used in conjunction with other policies, it will not prevent other policies from deleting segments or recordings before the retention period elapses.

It can be configured via properties or programmatically.

For example, to delete recordings that stopped at least 10 days ago.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.DeleteRecordingsAfterPeriodPolicy
# The period to wait before deleting recordings
aeron.data.retention.policies.myPolicy.period=10
aeron.data.retention.policies.myPolicy.period.unit=DAYS
final DeleteRecordingsAfterPeriodPolicy.Context context = new DeleteRecordingsAfterPeriodPolicy.Context()
    .retentionPeriod(10)
    .retentionPeriodUnit(TimeUnit.DAYS);

final DataRetentionPolicy policy = new DeleteRecordingsAfterPeriodPolicy(context);

Detaching and deleting old segment files

The DeleteSegmentsAfterPeriodPolicy policy detaches and deletes recording segment files that have not been modified for some period of time. It will scan the segment files from the start of the recording until it finds a segment that has been modified within the retention period.

The coverage, which is particularly important to consider in cascades, depends on the configured retention mode:

  • NO_RETENTION (default): The policy will only "cover" recordings that have segments due for deletion, i.e., segments last modified before the retention period,

  • RETENTION_UNTIL_DELETION: The policy will "cover" all recordings, i.e., the policy will ensure data is kept for the retention period by proposing a recording start position based on the earliest segment that was modified within the retention period, or, if no segment files were, the most recent segment file.

It can be configured via properties or programmatically.

For example, to retain recording segments for 10 days and then delete them.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.DeleteSegmentsAfterPeriodPolicy
# The period to wait before deleting recording segments
aeron.data.retention.policies.myPolicy.period=10
aeron.data.retention.policies.myPolicy.period.unit=DAYS
aeron.data.retention.policies.myPolicy.retention.mode=RETENTION_UNTIL_DELETION
final DeleteSegmentsAfterPeriodPolicy.Context context = new DeleteSegmentsAfterPeriodPolicy.Context()
    .retentionPeriod(10)
    .retentionPeriodUnit(TimeUnit.DAYS)
    .retentionMode(DeleteSegmentsAfterPeriodPolicy.RetentionMode.RETENTION_UNTIL_DELETION);

final DataRetentionPolicy policy = new DeleteSegmentsAfterPeriodPolicy(context);

Cascading policies

The CascadePolicy policy accepts a sequence of policies. It will try each policy in turn and apply the first policy that "covers" a recording, i.e., that has an opinion on what the extent of the recording should be.

It can be configured via properties or programmatically.

For example, to delete recordings that have stopped some time ago, but retain at least 10 GB of data per recording before deleting, you can cascade the DeleteAfterPeriodPolicy and RetainLengthPolicy policies.

aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.CascadePolicy
# The inner policies
aeron.data.retention.policies.myPolicy.inner.policies=deleteAfterPolicy,retainLengthPolicy
# For example:
aeron.data.retention.policies.deleteAfterPolicy.class=io.aeron.data.retention.regulator.policies.DeleteRecordingsAfterPeriodPolicy
aeron.data.retention.policies.deleteAfterPolicy.period=10
aeron.data.retention.policies.deleteAfterPolicy.period.unit=DAYS
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy1 = somePolicy();
final DataRetentionPolicy innerPolicy2 = someOtherPolicy();

final CascadePolicy.Context context = new CascadePolicy.Context()
    .policies(innerPolicy1, innerPolicy2);

final DataRetentionPolicy policy = new CascadePolicy(context);

How does the regulator work?

The regulator lists the recordings in the archive, then, for each recording, it determines how much data to retain based its root DataRetentionPolicy.

The DataRetentionPolicy is typically a tree of policies, which forms a coherent strategy for data retention. A policy can be a leaf policy, such as RetainLengthPolicy, or a composite policy, such as RetainMaxOfPolicy.

The job of a policy is to determine how much data to retain for a recording. Given a recording’s metadata, when DataRetentionRegulator#evaulate is called, it must decide the following:

  • Whether the policy applies to the recording, e.g., by matching the recording’s channel URI fragment, stream ID, etc.

    • Returning null means the policy does not apply.

  • Whether to adjust the start position of the recording, i.e., whether to detach segments.

  • Whether to delete segments.

  • Whether to delete the entire recording.

These decisions are reflected in the RecordingExtentInstruction returned by the policy.

The regulator then applies the instructions to the recording, which may involve detaching segments, deleting segments, or purging the entire recording. If the policy doesn’t apply to the recording, i.e., when the policy returns null, the regulator will keep the recording as is.

Backing up segment files

Prior to deleting segments or purging an entire recording, the regulator will back up segment files using the SegmentFileBackingUpStrategy supplied in its Context. The default strategy is to perform no backing up. A custom strategy can be provided to back up segment files to a different location, e.g., to a cloud storage service.

If backing up fails to complete immediately, e.g., the strategy throws an exception or returns false to indicate that backing up is asynchronous and has not yet completed, the regulator will not proceed with deleting segments or purging the recording. When the regulator is run again, assuming the root policy still says the segment should be deleted, the regulator will attempt to back up the segment file again.

Performing a "dry run"

The regulator can be run in "dry run" mode, where it will not actually delete any data, but will log what it would have done. This can be useful for testing policies.

To run in "dry run" mode, set the system property aeron.data.retention.dry.run to true, programmatically set DataRetentionRegulator.Context#dryRunListener, or call DataRetentionRegulator#executeDryRun with a DryRunListener.