Skip to content

Commit

Permalink
Adding additional debug and trace logs in cluster state fetch path an…
Browse files Browse the repository at this point in the history
…d replication path between leader and follower cluster

Signed-off-by: Nishant Goel <nisgoel@amazon.com>
  • Loading branch information
nisgoel-amazon committed Sep 30, 2024
1 parent c67051d commit cb8c77f
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus

@Suppress("BlockingMethodInNonBlockingContext")
override fun asyncShardOperation(request: GetChangesRequest, shardId: ShardId, listener: ActionListener<GetChangesResponse>) {
log.debug("calling asyncShardOperation method")
GlobalScope.launch(threadPool.coroutineContext(REPLICATION_EXECUTOR_NAME_LEADER)) {
// TODO: Figure out if we need to acquire a primary permit here
log.debug("$REPLICATION_EXECUTOR_NAME_LEADER coroutine has initiated")
listener.completeWith {
var relativeStartNanos = System.nanoTime()
remoteStatsService.stats[shardId] = remoteStatsService.stats.getOrDefault(shardId, RemoteShardMetric())
Expand All @@ -82,7 +84,9 @@ class TransportGetChangesAction @Inject constructor(threadPool: ThreadPool, clus
// There are no new operations to sync. Do a long poll and wait for GlobalCheckpoint to advance. If
// the checkpoint doesn't advance by the timeout this throws an ESTimeoutException which the caller
// should catch and start a new poll.
log.trace("Waiting for global checkpoint to advance from ${request.fromSeqNo} Sequence Number")
val gcp = indexShard.waitForGlobalCheckpoint(request.fromSeqNo, WAIT_FOR_NEW_OPS_TIMEOUT)
log.trace("Waiting for global checkpoint to advance is finished for ${request.fromSeqNo} Sequence Number")

// At this point indexShard.lastKnownGlobalCheckpoint has advanced but it may not yet have been synced
// to the translog, which means we can't return those changes. Return to the caller to retry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Any checks on the settings is followed by setup checks to ensure all relevant changes are
// present across the plugins
// validate index metadata on the leader cluster
log.debug("Fetching leader cluster state for ${request.leaderIndex} index.")
val leaderClusterState = getLeaderClusterState(request.leaderAlias, request.leaderIndex)
ValidationUtil.validateLeaderIndexState(request.leaderAlias, request.leaderIndex, leaderClusterState)

val leaderSettings = getLeaderIndexSettings(request.leaderAlias, request.leaderIndex)
log.debug("Leader settings were fetched for ${request.leaderIndex} index.")

if (leaderSettings.keySet().contains(ReplicationPlugin.REPLICATED_INDEX_SETTING.key) and
!leaderSettings.get(ReplicationPlugin.REPLICATED_INDEX_SETTING.key).isNullOrBlank()) {
Expand All @@ -113,7 +115,9 @@ class TransportReplicateIndexAction @Inject constructor(transportService: Transp
// Setup checks are successful and trigger replication for the index
// permissions evaluation to trigger replication is based on the current security context set
val internalReq = ReplicateIndexClusterManagerNodeRequest(user, request)
log.debug("Starting replication index action on current master node")
client.suspendExecute(ReplicateIndexClusterManagerNodeAction.INSTANCE, internalReq)
log.debug("Response of start replication action is returned")
ReplicateIndexResponse(true)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
throw OpenSearchStatusException("[FORBIDDEN] Replication START block is set", RestStatus.FORBIDDEN)
}

log.debug("Making request to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")
val remoteMetadata = getRemoteIndexMetadata(replicateIndexReq.leaderAlias, replicateIndexReq.leaderIndex)
log.debug("Response returned of the request made to get metadata of ${replicateIndexReq.leaderIndex} index on remote cluster")

if (state.routingTable.hasIndex(replicateIndexReq.followerIndex)) {
throw IllegalArgumentException("Cant use same index again for replication. " +
Expand All @@ -115,6 +117,7 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
ReplicationOverallState.RUNNING, user, replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.FOLLOWER_CLUSTER_ROLE, null),
replicateIndexReq.useRoles?.getOrDefault(ReplicateIndexRequest.LEADER_CLUSTER_ROLE, null), replicateIndexReq.settings)

log.debug("Starting index replication task in persistent task service with name: replication:index:${replicateIndexReq.followerIndex}")
val task = persistentTasksService.startTask("replication:index:${replicateIndexReq.followerIndex}",
IndexReplicationExecutor.TASK_NAME, params)

Expand All @@ -123,13 +126,15 @@ class TransportReplicateIndexClusterManagerNodeAction @Inject constructor(transp
listener.onResponse(ReplicateIndexResponse(false))
}

log.debug("Waiting for persistent task to move to following state")
// Now wait for the replication to start and the follower index to get created before returning
persistentTasksService.waitForTaskCondition(task.id, replicateIndexReq.timeout()) { t ->
val replicationState = (t.state as IndexReplicationState?)?.state
replicationState == ReplicationState.FOLLOWING ||
(!replicateIndexReq.waitForRestore && replicationState == ReplicationState.RESTORING)
}

log.debug("Persistent task is moved to following replication state")
listener.onResponse(AcknowledgedResponse(true))
} catch (e: Exception) {
log.error("Failed to trigger replication for ${replicateIndexReq.followerIndex} - ${e.stackTraceToString()}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ class AutoFollowTask(id: Long, type: String, action: String, description: String
if (!response.isAcknowledged) {
throw ReplicationException("Failed to auto follow leader index $leaderIndex")
}
log.debug("Auto follow has started replication from ${leaderAlias}:$leaderIndex -> $leaderIndex")
successStart = true
} catch (e: OpenSearchSecurityException) {
// For permission related failures, Adding as part of failed indices as autofollow role doesn't have required permissions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class ShardReplicationTask(id: Long, type: String, action: String, description:
logDebug("Cluster metadata listener invoked on shard task...")
if (event.metadataChanged()) {
val replicationStateParams = getReplicationStateParamsForIndex(clusterService, followerShardId.indexName)
logDebug("Replication State Params are fetched from cluster state")
if (replicationStateParams == null) {
if (PersistentTasksNodeService.Status(State.STARTED) == status)
cancelTask("Shard replication task received an interrupt.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ suspend fun <Req: ActionRequest, Resp: ActionResponse> Client.suspendExecuteWith
var retryException: Exception
repeat(numberOfRetries - 1) { index ->
try {
log.debug("Sending get changes request after ${currentBackoff / 1000} seconds.")
return suspendExecute(replicationMetadata, action, req,
injectSecurityContext = injectSecurityContext, defaultContext = defaultContext)
} catch (e: OpenSearchException) {
Expand Down

0 comments on commit cb8c77f

Please sign in to comment.