Skip to content

Commit

Permalink
Add logic in master service to optimize performance and retain detail…
Browse files Browse the repository at this point in the history
…ed logging for critical cluster operations. (opensearch-project#16421)

Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
Signed-off-by: shwetathareja <shwetathareja@live.com>
Co-authored-by: shwetathareja <shwetathareja@live.com>
  • Loading branch information
sumitasr and shwetathareja authored Oct 26, 2024
1 parent b2d537a commit 6f1b59e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
final String summary;
if (logger.isTraceEnabled()) {
summary = taskInputs.taskSummaryGenerator.apply(true);
} else {
summary = taskInputs.taskSummaryGenerator.apply(false);
}

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
return;
}

if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
logger.trace("executing cluster state update for [{}]", summary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
logger.debug("executing cluster state update for [{}]", summary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
logExecutionTime(computationTime, "compute cluster state update", summary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -337,25 +341,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
shortSummary,
summary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(shortSummary, publicationStartTime, newClusterState, e);
handleException(summary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (toExecute.isEmpty() == false) {
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
final List<BatchedTask> sampleTasks = toExecute.stream()
.limit(Math.min(1000, toExecute.size()))
.collect(Collectors.toList());
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
return getSummary(updateTask, toExecute);
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}

private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
return "Tasks batched with key: "
+ batchingKey.toString().split("\\$")[0]
+ ", count:"
+ taskCount
+ " and sample tasks: "
+ sampleTasks;
}

/**
Expand Down
Loading

0 comments on commit 6f1b59e

Please sign in to comment.