Skip to content

Commit

Permalink
[Java] Remove default for ConsensusModule.Context.clusterMembers. Thi…
Browse files Browse the repository at this point in the history
…s has knock on effects for the ClusterBackup. The defaulting in the static block is removed and deriving the consensus channel and catchup endpoint from a supplied cluster member values. This logic is moved into the configuration methods.
  • Loading branch information
mikeb01 committed Sep 9, 2022
1 parent c7d62bf commit f2d4087
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 42 deletions.
87 changes: 60 additions & 27 deletions aeron-cluster/src/main/java/io/aeron/cluster/ClusterBackup.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.agrona.ErrorHandler;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.IoUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.concurrent.*;
import org.agrona.concurrent.errors.DistinctErrorLog;
import org.agrona.concurrent.status.AtomicCounter;
Expand Down Expand Up @@ -277,15 +276,9 @@ public void close()
public static class Configuration
{
/**
* Default which is derived from {@link ConsensusModule.Context#consensusChannel()} with the member endpoint
* added for the consensus channel.
*/
public static final String CONSENSUS_CHANNEL_DEFAULT;

/**
* Member endpoint used for the catchup channel.
* Channel template used for catchup and replication of log and snapshots.
*/
public static final String CATCHUP_ENDPOINT_DEFAULT;
public static final String CLUSTER_BACKUP_CATCHUP_ENDPOINT_PROP_NAME = "aeron.cluster.backup.catchup.endpoint";

/**
* Channel template used for catchup and replication of log and snapshots.
Expand Down Expand Up @@ -339,23 +332,28 @@ public static class Configuration
*/
public static final long CLUSTER_BACKUP_PROGRESS_TIMEOUT_DEFAULT_NS = TimeUnit.SECONDS.toNanos(10);

static
/**
* The value of system property {@link #CLUSTER_BACKUP_CATCHUP_CHANNEL_PROP_NAME} if set, otherwise it will
* try to derive the catchup endpoint from {@link ConsensusModule.Configuration#clusterMembers()} and
* {@link ConsensusModule.Configuration#clusterMemberId()}. Failing that null will be returned.
*
* @return system property {@link #CLUSTER_BACKUP_CATCHUP_CHANNEL_PROP_NAME}, the derived value, or null.
*/
public static String catchupEndpoint()
{
final ClusterMember[] clusterMembers = ClusterMember.parse(ConsensusModule.Configuration.clusterMembers());
final Int2ObjectHashMap<ClusterMember> clusterMemberByIdMap = new Int2ObjectHashMap<>();
String configuredCatchupEndpoint = System.getProperty(CLUSTER_BACKUP_CATCHUP_CHANNEL_PROP_NAME);

ClusterMember.addClusterMemberIds(clusterMembers, clusterMemberByIdMap);

final ClusterMember member = ClusterMember.determineMember(
clusterMembers,
ConsensusModule.Configuration.clusterMemberId(),
ConsensusModule.Configuration.memberEndpoints());
if (null == configuredCatchupEndpoint && null != ConsensusModule.Configuration.clusterMembers())
{
final ClusterMember member = ClusterMember.determineMember(
ClusterMember.parse(ConsensusModule.Configuration.clusterMembers()),
ConsensusModule.Configuration.clusterMemberId(),
ConsensusModule.Configuration.memberEndpoints());

final ChannelUri consensusUri = ChannelUri.parse(ConsensusModule.Configuration.consensusChannel());
consensusUri.put(ENDPOINT_PARAM_NAME, member.consensusEndpoint());
configuredCatchupEndpoint = member.catchupEndpoint();
}

CONSENSUS_CHANNEL_DEFAULT = consensusUri.toString();
CATCHUP_ENDPOINT_DEFAULT = member.catchupEndpoint();
return configuredCatchupEndpoint;
}

/**
Expand All @@ -370,6 +368,36 @@ public static String catchupChannel()
return System.getProperty(CLUSTER_BACKUP_CATCHUP_CHANNEL_PROP_NAME, CLUSTER_BACKUP_CATCHUP_CHANNEL_DEFAULT);
}

/**
* The value of system property {@link ConsensusModule.Configuration#consensusChannel()} if set. If that channel
* does not have an endpoint set, then this will try to derive one using
* {@link ConsensusModule.Configuration#clusterMembers()} and
* {@link ConsensusModule.Configuration#clusterMemberId()}.
*
* @return system property {@link #CLUSTER_BACKUP_CATCHUP_CHANNEL_PROP_NAME}, the derived value, or null.
*/
public static String consensusChannel()
{
String consensusChannel = ConsensusModule.Configuration.consensusChannel();

if (null != consensusChannel && null != ConsensusModule.Configuration.clusterMembers())
{
final ChannelUri consensusUri = ChannelUri.parse(consensusChannel);
if (!consensusUri.containsKey(ENDPOINT_PARAM_NAME))
{
final ClusterMember member = ClusterMember.determineMember(
ClusterMember.parse(ConsensusModule.Configuration.clusterMembers()),
ConsensusModule.Configuration.clusterMemberId(),
ConsensusModule.Configuration.memberEndpoints());

consensusUri.put(ENDPOINT_PARAM_NAME, member.consensusEndpoint());
consensusChannel = consensusUri.toString();
}
}

return consensusChannel;
}

/**
* Interval at which a cluster backup will send backup queries.
*
Expand Down Expand Up @@ -432,12 +460,12 @@ public static class Context
private Aeron aeron;

private int clusterId = ClusteredServiceContainer.Configuration.clusterId();
private String consensusChannel = Configuration.CONSENSUS_CHANNEL_DEFAULT;
private String consensusChannel = Configuration.consensusChannel();
private int consensusStreamId = ConsensusModule.Configuration.consensusStreamId();
private int consensusModuleSnapshotStreamId = ConsensusModule.Configuration.snapshotStreamId();
private int serviceSnapshotStreamId = ClusteredServiceContainer.Configuration.snapshotStreamId();
private int logStreamId = ConsensusModule.Configuration.logStreamId();
private String catchupEndpoint = Configuration.CATCHUP_ENDPOINT_DEFAULT;
private String catchupEndpoint = Configuration.catchupEndpoint();
private String catchupChannel = Configuration.catchupChannel();

private long clusterBackupIntervalNs = Configuration.clusterBackupIntervalNs();
Expand Down Expand Up @@ -511,6 +539,11 @@ public void conclude()
IoUtil.delete(clusterDir, false);
}

if (null == catchupEndpoint)
{
throw new ClusterException("ClusterBackup.Context.catchupEndpoint must be set");
}

if (!clusterDir.exists() && !clusterDir.mkdirs())
{
throw new ClusterException("failed to create cluster dir: " + clusterDir.getAbsolutePath());
Expand Down Expand Up @@ -1131,11 +1164,11 @@ public int logStreamId()
}

/**
* Set the catchup endpoint to use for log retrieval.
* Set the endpoint that will be subscribed to in order to receive logs and snapshots.
*
* @param catchupEndpoint to use for the log retrieval.
* @return catchup endpoint to use for the log retrieval.
* @see Configuration#CATCHUP_ENDPOINT_DEFAULT
* @see Configuration#catchupEndpoint()
*/
public Context catchupEndpoint(final String catchupEndpoint)
{
Expand All @@ -1147,7 +1180,7 @@ public Context catchupEndpoint(final String catchupEndpoint)
* Get the catchup endpoint to use for log retrieval.
*
* @return catchup endpoint to use for the log retrieval.
* @see Configuration#CATCHUP_ENDPOINT_DEFAULT
* @see Configuration#catchupEndpoint()
*/
public String catchupEndpoint()
{
Expand Down
19 changes: 8 additions & 11 deletions aeron-cluster/src/main/java/io/aeron/cluster/ConsensusModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -364,12 +364,6 @@ public static final class Configuration
*/
public static final String CLUSTER_MEMBERS_PROP_NAME = "aeron.cluster.members";

/**
* Default property for the list of cluster member endpoints.
*/
public static final String CLUSTER_MEMBERS_DEFAULT =
"0,localhost:20000,localhost:20001,localhost:20002,localhost:0,localhost:8010";

/**
* Property name for the comma separated list of cluster consensus endpoints used for adding passive
* followers as well as dynamic join of a cluster.
Expand Down Expand Up @@ -824,15 +818,13 @@ public static int appointedLeaderId()
}

/**
* The value {@link #CLUSTER_MEMBERS_DEFAULT} or system property
* {@link #CLUSTER_MEMBERS_PROP_NAME} if set.
* The value of system property {@link #CLUSTER_MEMBERS_PROP_NAME} if set, null otherwise.
*
* @return {@link #CLUSTER_MEMBERS_DEFAULT} or system property
* {@link #CLUSTER_MEMBERS_PROP_NAME} if set.
* @return of system property {@link #CLUSTER_MEMBERS_PROP_NAME} if set.
*/
public static String clusterMembers()
{
return System.getProperty(CLUSTER_MEMBERS_PROP_NAME, CLUSTER_MEMBERS_DEFAULT);
return System.getProperty(CLUSTER_MEMBERS_PROP_NAME);
}

/**
Expand Down Expand Up @@ -1361,6 +1353,11 @@ public void conclude()
clusterDir = new File(clusterDirectoryName);
}

if (null == clusterMembers)
{
throw new ClusterException("ConsensusModule.Context.clusterMembers must be set");
}

if (deleteDirOnStart)
{
IoUtil.delete(clusterDir, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.aeron.security.AuthorisationService;
import io.aeron.security.DefaultAuthenticatorSupplier;
import io.aeron.status.ReadableCounter;
import io.aeron.test.TestContexts;
import io.aeron.test.Tests;
import io.aeron.test.cluster.TestClusterClock;
import org.agrona.collections.MutableLong;
Expand Down Expand Up @@ -68,7 +69,7 @@ public class ConsensusModuleAgentTest
private final Counter mockTimedOutClientCounter = mock(Counter.class);
private final LongConsumer mockTimeConsumer = mock(LongConsumer.class);

private final ConsensusModule.Context ctx = new ConsensusModule.Context()
private final ConsensusModule.Context ctx = TestContexts.localhostConsensusModule()
.errorHandler(Tests::onError)
.errorCounter(mock(AtomicCounter.class))
.moduleStateCounter(mock(Counter.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.aeron.exceptions.ConfigurationException;
import io.aeron.security.AuthorisationService;
import io.aeron.security.AuthorisationServiceSupplier;
import io.aeron.test.TestContexts;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.status.AtomicCounter;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -56,7 +57,7 @@ void beforeEach()
when(aeron.context()).thenReturn(aeronContext);
when(aeron.conductorAgentInvoker()).thenReturn(conductorInvoker);

context = new ConsensusModule.Context()
context = TestContexts.localhostConsensusModule()
.clusterDir(clusterDir)
.aeron(aeron)
.errorCounter(mock(AtomicCounter.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void shouldSupportTwoSingleNodeClusters()

try (ArchivingMediaDriver ignore = ArchivingMediaDriver.launch(driverCtx, archiveCtx))
{
final ConsensusModule.Context moduleCtx0 = new ConsensusModule.Context()
final ConsensusModule.Context moduleCtx0 = TestContexts.localhostConsensusModule()
.clusterId(0)
.deleteDirOnStart(true)
.clusterDir(new File(SystemUtil.tmpDirName(), "cluster-0-0"))
Expand All @@ -86,7 +86,7 @@ void shouldSupportTwoSingleNodeClusters()
.serviceStreamId(moduleCtx0.serviceStreamId())
.consensusModuleStreamId(moduleCtx0.consensusModuleStreamId());

final ConsensusModule.Context moduleCtx1 = new ConsensusModule.Context()
final ConsensusModule.Context moduleCtx1 = TestContexts.localhostConsensusModule()
.clusterId(1)
.deleteDirOnStart(true)
.clusterDir(new File(SystemUtil.tmpDirName(), "cluster-0-1"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

import io.aeron.archive.Archive;
import io.aeron.archive.client.AeronArchive;
import io.aeron.cluster.ConsensusModule;

public class TestContexts
{
public static final String LOCALHOST_REPLICATION_CHANNEL = "aeron:udp?endpoint=localhost:0";
public static final String LOCALHOST_CONTROL_REQUEST_CHANNEL = "aeron:udp?endpoint=localhost:8010";
public static final String LOCALHOST_CONTROL_RESPONSE_CHANNEL = "aeron:udp?endpoint=localhost:0";
public static final String LOCALHOST_SINGLE_HOST_CLUSTER_MEMBERS =
"0,localhost:20000,localhost:20001,localhost:20002,localhost:0,localhost:8010";

public static Archive.Context localhostArchive()
{
Expand All @@ -38,4 +41,9 @@ public static AeronArchive.Context localhostAeronArchive()
.controlResponseChannel(LOCALHOST_CONTROL_RESPONSE_CHANNEL);
}

public static ConsensusModule.Context localhostConsensusModule()
{
return new ConsensusModule.Context()
.clusterMembers(LOCALHOST_SINGLE_HOST_CLUSTER_MEMBERS);
}
}

0 comments on commit f2d4087

Please sign in to comment.