Data Retention Regulator
Overview
The Data Retention Regulator is a tool for managing the lifetime of data in Aeron Cluster and Aeron Archive to help avoid running out of disk space.
How to get the Data Retention Regulator
The Data Retention Regulator is a Premium Aeron component. The binaries are accessible from the Adaptive Artifactory.
<groupId>io.aeron.premium.drr</groupId>
<artifactId>aeron-data-retention-regulator</artifactId>
Usage
The tool can be run from the command line or programmatically.
CLI usage
java \
--add-opens java.base/jdk.internal.misc=ALL-UNNAMED \
-Daeron.archive.dir=/path/to/archive/dir \
-jar /path/to/aeron-data-retention-regulator-all.jar \
a.properties b.properties
where a.properties and b.properties are files containing system
property values to configure the regulator’s policies.
Note that system property values take precedence over properties in the
files, and property values in "later'' files take precedence over those
in "earlier'' files, e.g., b.properties takes precedence over
a.properties and -Dfoo.bar=baz takes precedence over both.
Programmatic usage
final DataRetentionRegulator.Context context = new DataRetentionRegulator.Context()
.archiveDir(archiveDir)
.rootPolicy(policy);
try (DataRetentionRegulator regulator = new DataRetentionRegulator(context))
{
regulator.execute();
}
where policy is an instance of DataRetentionPolicy.
Example policies
The data retention regulator uses a root policy to determine what to do with data. Below are some examples of policies that can be used.
Keeping N snapshots worth of data (incl. log) for recovery
The RetainEnoughDataForRecoveryPolicy policy keeps the last N
snapshots and the log following the earliest of these snapshots
available for cluster node recovery.
-
It will detach and delete the log segments prior to the earliest snapshot.
-
It will also delete any snapshot recordings that are no longer needed.
-
It will only consider valid snapshots.
-
Until the
reocording.logis available, it will specify that all data of all recordings should be retained so that other policies do not "accidentally" delete log data. -
When the standby directory and cluster identifier is provided, it assumes a
TransitionModuleis running, and it will determine the snapshots to keep based on the snapshots available in the activerecording.log, i.e., from the consensus module’srecording.logwhen in theCONSENSUSstate and from the standby’srecording.logwhen in theSTANDBYstate. -
To run against a
ClusterStandbynode, without aTransitionModule, theclusterDirName(orcluster.dir) policy property should be set to the standby directory.
It can be configured via properties or programmatically.
When not set via an explicit policy property or programmatically:
-
clusterIdwill fall back to the system property valueaeron.cluster.id, -
clusterDirNamewill fall back to the system property valueaeron.cluster.dir, and -
standbyDirNamewill fall back to the system property valueaeron.cluster.standby.dir.
For example, to retain at least 5 snapshots worth of data for recovery.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.cluster.RetainEnoughDataForRecoveryPolicy
# The number of snapshots to keep
aeron.data.retention.policies.myPolicy.snapshot.count=5
# Whether to include snapshots from standby nodes in calculations
aeron.data.retention.policies.myPolicy.include.standby.snapshots=true
# The consensus module directory
aeron.data.retention.policies.myPolicy.cluster.dir=/path/to/consensus
# The (optional) cluster standby directory and cluster id (when running on against a node using a TransitionModule)
aeron.data.retention.policies.myPolicy.standby.dir=/path/to/standby
aeron.data.retention.policies.myPolicy.cluster.id=1
final RetainEnoughDataForRecoveryPolicy.Context context = new RetainEnoughDataForRecoveryPolicy.Context()
.snapshotCount(5)
.includeStandbySnapshotsInCalcuations(true)
.clusterDirName("/path/to/consensus")
.standbyDirName("/path/to/standby")
.clusterId(1);
final DataRetentionPolicy policy = new RetainEnoughDataForRecoveryPolicy(context);
Keeping N bytes of data per recording
The RetainLengthPolicy policy keeps at least the last N bytes of data.
It will detach and delete recording segments before the last N bytes.
It can be configured via properties or programmatically.
For example, to retain at least 10 GB of data per recording.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
# The number of bytes to keep
aeron.data.retention.policies.myPolicy.length=10g
final RetainLengthPolicy.Context context = new RetainLengthPolicy.Context()
.length(10 * 1024 * 1024 * 1024L);
final DataRetentionPolicy policy = new RetainLengthPolicy(context);
Applying policies to particular recordings
The MatchingStreamsPolicy policy applies an inner policy to recordings
that match a channel URI and/or stream identifier.
-
It will only apply the inner policy to recordings that match the channel URI predicate and/or stream ID.
-
When a stream identifier is provided, it will only match recordings that have that stream identifier.
-
When a channel URI predicate is provided, it will only match recordings with original channel URIs that pass the predicate.
-
When both stream identifier and channel URI predicate are provided, it will only match recordings that match both.
-
When using properties, the
original.channel.fragmentaccepts a regular expression pattern to match part of the original channel URI.
It can be configured via properties or programmatically.
For example, to retain at least 10 GB of data per recording for
recordings with the channel URI fragment alias=foo.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.MatchingStreamsPolicy
# The channel URI fragment to match
aeron.data.retention.policies.myPolicy.original.channel.fragment=alias\=foo(\\||$)
# The stream ID to match
aeron.data.retention.policies.myPolicy.stream.id=1
# The inner policy
aeron.data.retention.policies.myPolicy.inner.policy=myInnerPolicy
# For example:
aeron.data.retention.policies.myInnerPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.myInnerPolicy.length=10g
final DataRetentionPolicy innerPolicy = someInnerPolicy();
final MatchingStreamsPolicy.Context context = new MatchingStreamsPolicy.Context()
.originalChannelFilter(Pattern.compile("alias=foo(\\||$)").asPredicate())
.streamId(1)
.delegate(innerPolicy);
final DataRetentionPolicy policy = new MatchingStreamsPolicy(context);
Applying policies to the Consensus Module’s log recording
The MatchingLogRecordingPolicy policy applies an inner policy to the
Consensus Module’s log recording.
-
Until the
recording.logis available, it will specify that all data of all recordings should be retained so that other policies do not "accidentally" delete log data. -
When the standby directory and cluster identifier is provided, it assumes a
TransitionModuleis running, and it will determine the recording for the log based on the activerecording.log, i.e., from the consensus module’srecording.logwhen in theCONSENSUSstate and from the standby’srecording.logwhen in theSTANDBYstate. Normally, these recordings are the same. -
To run against a
ClusterStandbynode, without aTransitionModule, theclusterDirName(orcluster.dir) policy property should be set to the standby directory.
It can be configured via properties or programmatically.
When not set via an explicit policy property or programmatically:
-
clusterIdwill fall back to the system property valueaeron.cluster.id, -
clusterDirNamewill fall back to the system property valueaeron.cluster.dir, and -
standbyDirNamewill fall back to the system property valueaeron.cluster.standby.dir.
For example, to retain 10 GB of log recording.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.cluster.MatchingRecordingLogPolicy
# The consensus module directory
aeron.data.retention.policies.myPolicy.cluster.dir=/path/to/consensus
# The (optional) cluster standby directory and cluster id (when running on against a node using a TransitionModule)
aeron.data.retention.policies.myPolicy.standby.dir=/path/to/standby
aeron.data.retention.policies.myPolicy.cluster.id=1
# The inner policy
aeron.data.retention.policies.myPolicy.inner.policy=retainLengthPolicy
# For example:
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy = somePolicy();
final MatchingRecordingLogPolicy.Context context = new MatchingRecordingLogPolicy.Context()
.clusterDirName("/path/to/consensus")
.standbyDirName("/path/to/standby")
.clusterId(1)
.delegate(innerPolicy);
final DataRetentionPolicy policy = new MatchingRecordingLogPolicy(context);
Retaining the maximum of multiple policies
The RetainMaxOfPolicy policy safely combines policies, e.g., to retain
X snapshots worth of cluster data and Y bytes of data per recording, by
retaining the maximum of its inner policies.
It can be configured via properties or programmatically.
For example, to retain at least 5 snapshots worth of data for recovery and at least 10 GB of data per recording.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.RetainMaxOfPolicy
# The inner policies
aeron.data.retention.policies.myPolicy.inner.policies=retainClusterDataPolicy,retainLengthPolicy
# For example:
aeron.data.retention.policies.retainClusterDataPolicy.class=io.aeron.data.retention.regulator.policies.cluster.RetainEnoughDataForRecoveryPolicy
aeron.data.retention.policies.retainClusterDataPolicy.snapshot.count=5
aeron.data.retention.policies.retainClusterDataPolicy.include.standby.snapshots=true
aeron.data.retention.policies.retainClusterDataPolicy.cluster.dir=/path/to/consensus
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy1 = somePolicy();
final DataRetentionPolicy innerPolicy2 = someOtherPolicy();
final RetainMaxOfPolicy.Context context = new RetainMaxOfPolicy.Context()
.policies(innerPolicy1, innerPolicy2);
final DataRetentionPolicy policy = new RetainMaxOfPolicy(context);
Purging recordings that stopped some time ago
The DeleteRecordingsAfterPeriodPolicy policy deletes recordings that
stopped some period of time ago. Note that, unlike RetainLengthPolicy,
this policy does not specify any retention. It will only specify
recordings to delete. Therefore, when used in conjunction with other
policies, it will not prevent other policies from deleting segments or
recordings before the retention period elapses.
It can be configured via properties or programmatically.
For example, to delete recordings that stopped at least 10 days ago.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.DeleteRecordingsAfterPeriodPolicy
# The period to wait before deleting recordings
aeron.data.retention.policies.myPolicy.period=10
aeron.data.retention.policies.myPolicy.period.unit=DAYS
final DeleteRecordingsAfterPeriodPolicy.Context context = new DeleteRecordingsAfterPeriodPolicy.Context()
.retentionPeriod(10)
.retentionPeriodUnit(TimeUnit.DAYS);
final DataRetentionPolicy policy = new DeleteRecordingsAfterPeriodPolicy(context);
Detaching and deleting old segment files
The DeleteSegmentsAfterPeriodPolicy policy detaches and deletes
recording segment files that have not been modified for some period of
time. It will scan the segment files from the start of the recording
until it finds a segment that has been modified within the retention
period.
The coverage, which is particularly important to consider in cascades, depends on the configured retention mode:
-
NO_RETENTION(default): The policy will only "cover" recordings that have segments due for deletion, i.e., segments last modified before the retention period, -
RETENTION_UNTIL_DELETION: The policy will "cover" all recordings, i.e., the policy will ensure data is kept for the retention period by proposing a recording start position based on the earliest segment that was modified within the retention period, or, if no segment files were, the most recent segment file.
It can be configured via properties or programmatically.
For example, to retain recording segments for 10 days and then delete them.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.DeleteSegmentsAfterPeriodPolicy
# The period to wait before deleting recording segments
aeron.data.retention.policies.myPolicy.period=10
aeron.data.retention.policies.myPolicy.period.unit=DAYS
aeron.data.retention.policies.myPolicy.retention.mode=RETENTION_UNTIL_DELETION
final DeleteSegmentsAfterPeriodPolicy.Context context = new DeleteSegmentsAfterPeriodPolicy.Context()
.retentionPeriod(10)
.retentionPeriodUnit(TimeUnit.DAYS)
.retentionMode(DeleteSegmentsAfterPeriodPolicy.RetentionMode.RETENTION_UNTIL_DELETION);
final DataRetentionPolicy policy = new DeleteSegmentsAfterPeriodPolicy(context);
Cascading policies
The CascadePolicy policy accepts a sequence of policies. It will try
each policy in turn and apply the first policy that "covers" a
recording, i.e., that has an opinion on what the extent of the recording
should be.
It can be configured via properties or programmatically.
For example, to delete recordings that have stopped some time ago, but
retain at least 10 GB of data per recording before deleting, you can
cascade the DeleteAfterPeriodPolicy and RetainLengthPolicy policies.
aeron.data.retention.root.policy=myPolicy
# The implementation class
aeron.data.retention.policies.myPolicy.class=io.aeron.data.retention.regulator.policies.CascadePolicy
# The inner policies
aeron.data.retention.policies.myPolicy.inner.policies=deleteAfterPolicy,retainLengthPolicy
# For example:
aeron.data.retention.policies.deleteAfterPolicy.class=io.aeron.data.retention.regulator.policies.DeleteRecordingsAfterPeriodPolicy
aeron.data.retention.policies.deleteAfterPolicy.period=10
aeron.data.retention.policies.deleteAfterPolicy.period.unit=DAYS
aeron.data.retention.policies.retainLengthPolicy.class=io.aeron.data.retention.regulator.policies.RetainLengthPolicy
aeron.data.retention.policies.retainLengthPolicy.length=10g
final DataRetentionPolicy innerPolicy1 = somePolicy();
final DataRetentionPolicy innerPolicy2 = someOtherPolicy();
final CascadePolicy.Context context = new CascadePolicy.Context()
.policies(innerPolicy1, innerPolicy2);
final DataRetentionPolicy policy = new CascadePolicy(context);
How does the regulator work?
The regulator lists the recordings in the archive, then, for each
recording, it determines how much data to retain based its root
DataRetentionPolicy.
The DataRetentionPolicy is typically a tree of policies, which forms a
coherent strategy for data retention. A policy can be a leaf policy,
such as RetainLengthPolicy, or a composite policy, such as
RetainMaxOfPolicy.
The job of a policy is to determine how much data to retain for a
recording. Given a recording’s metadata, when
DataRetentionRegulator#evaulate is called, it must decide the
following:
-
Whether the policy applies to the recording, e.g., by matching the recording’s channel URI fragment, stream ID, etc.
-
Returning
nullmeans the policy does not apply.
-
-
Whether to adjust the start position of the recording, i.e., whether to detach segments.
-
Whether to delete segments.
-
Whether to delete the entire recording.
These decisions are reflected in the RecordingExtentInstruction
returned by the policy.
The regulator then applies the instructions to the recording, which may
involve detaching segments, deleting segments, or purging the entire
recording. If the policy doesn’t apply to the recording, i.e., when the
policy returns null, the regulator will keep the recording as is.
Backing up segment files
Prior to deleting segments or purging an entire recording, the regulator
will back up segment files using the SegmentFileBackingUpStrategy
supplied in its Context. The default strategy is to perform no backing
up. A custom strategy can be provided to back up segment files to a
different location, e.g., to a cloud storage service.
If backing up fails to complete immediately, e.g., the strategy throws
an exception or returns false to indicate that backing up is
asynchronous and has not yet completed, the regulator will not proceed
with deleting segments or purging the recording. When the regulator is
run again, assuming the root policy still says the segment should be
deleted, the regulator will attempt to back up the segment file again.
Performing a "dry run"
The regulator can be run in "dry run" mode, where it will not actually delete any data, but will log what it would have done. This can be useful for testing policies.
To run in "dry run" mode, set the system property
aeron.data.retention.dry.run to true, programmatically set
DataRetentionRegulator.Context#dryRunListener, or call
DataRetentionRegulator#executeDryRun with a DryRunListener.