diff --git a/server/src/main/java/org/opensearch/index/reindex/BulkByScrollTask.java b/server/src/main/java/org/opensearch/index/reindex/BulkByScrollTask.java index 0de49a920f03a..e792adb9a32f9 100644 --- a/server/src/main/java/org/opensearch/index/reindex/BulkByScrollTask.java +++ b/server/src/main/java/org/opensearch/index/reindex/BulkByScrollTask.java @@ -122,7 +122,7 @@ public TaskInfo taskInfoGivenSubtaskInfo(String localNodeId, List slic sliceStatuses.set(status.getSliceId(), new BulkByScrollTask.StatusOrException(status)); } Status status = leaderState.getStatus(sliceStatuses); - return taskInfo(localNodeId, getDescription(), status, null); + return taskInfo(localNodeId, getDescription(), status); } private BulkByScrollTask.Status emptyStatus() { diff --git a/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java b/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java index f28352a3e479e..c670ac5ba689c 100644 --- a/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java +++ b/server/src/main/java/org/opensearch/tasks/ResourceStatsType.java @@ -10,7 +10,7 @@ /** Defines the different types of resource stats. */ public enum ResourceStatsType { - // Resource stats of the worker thread that is reported directly from runnable. + // resource stats of the worker thread reported directly from runnable. WORKER_STATS("worker_stats", false); private final String statsType; diff --git a/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java b/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java index 16f59741ef7a6..ae58f712b63c2 100644 --- a/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java +++ b/server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java @@ -11,13 +11,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Collections; import java.util.EnumMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** * Thread resource usage information for particular resource stats type. *

- * It captures the resource usage information about a particular execution of thread + * It captures the resource usage information like memory, CPU about a particular execution of thread * for a specific stats type. */ public class ResourceUsageInfo { @@ -32,11 +34,16 @@ public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) { public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) { for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) { - ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats()); - if (resourceStatsInfo == null) { - statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue())); - } else { + final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats()); + if (resourceStatsInfo != null) { updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric); + } else { + throw new IllegalStateException( + "cannot update [" + + resourceUsageMetric.getStats().toString() + + "] entry as its not present current_stats_info:" + + statsInfo + ); } } } @@ -49,7 +56,7 @@ private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, Resour newEndValue = resourceUsageMetric.getValue(); if (currentEndValue > newEndValue) { logger.debug( - "Dropping resource usage update as the new value is lower than current value [" + "dropping resource usage update as the new value is lower than current value [" + "resource_stats=[{}], " + "current_end_value={}, " + "new_end_value={}]", @@ -61,15 +68,15 @@ private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, Resour } } while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue)); logger.debug( - "Updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]", + "updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]", resourceUsageMetric.getStats(), currentEndValue, newEndValue ); } - public EnumMap getStatsInfo() { - return statsInfo; + public Map getStatsInfo() { + return Collections.unmodifiableMap(statsInfo); } @Override diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index 5529c910bfced..62453d08724ce 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -42,6 +42,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -122,7 +123,7 @@ public Task( * generate data? */ public final TaskInfo taskInfo(String localNodeId, boolean detailed) { - return taskInfo(localNodeId, detailed, !detailed); + return taskInfo(localNodeId, detailed, detailed == false); } /** @@ -144,7 +145,7 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS status = getStatus(); } if (excludeStats == false) { - resourceStats = new TaskResourceStats(new HashMap() { + resourceStats = new TaskResourceStats(new HashMap<>() { { put(TOTAL, getTotalResourceStats()); } @@ -153,6 +154,13 @@ private TaskInfo taskInfo(String localNodeId, boolean detailed, boolean excludeS return taskInfo(localNodeId, description, status, resourceStats); } + /** + * Build a {@link TaskInfo} for this task without resource stats. + */ + protected final TaskInfo taskInfo(String localNodeId, String description, Status status) { + return taskInfo(localNodeId, description, status, null); + } + /** * Build a proper {@link TaskInfo} for this task. */ @@ -236,11 +244,13 @@ public Status getStatus() { * Returns thread level resource consumption of the task */ public Map> getResourceStats() { - return resourceStats; + return Collections.unmodifiableMap(resourceStats); } /** - * Returns total resource usage of the task + * Returns current total resource usage of the task. + * Currently, this method is only called on demand, during get and listing of tasks. + * In the future, these values can be cached as an optimization. */ public TaskResourceUsage getTotalResourceStats() { return new TaskResourceUsage(getTotalResourceUtilization(ResourceStats.CPU), getTotalResourceUtilization(ResourceStats.MEMORY)); @@ -249,14 +259,13 @@ public TaskResourceUsage getTotalResourceStats() { /** * Returns total resource consumption for a specific task stat. */ - public long getTotalResourceUtilization(ResourceStats taskStats) { + public long getTotalResourceUtilization(ResourceStats stats) { long totalResourceConsumption = 0L; for (List threadResourceInfosList : resourceStats.values()) { for (ThreadResourceInfo threadResourceInfo : threadResourceInfosList) { - for (Map.Entry entry : threadResourceInfo.getResourceUsageInfos().entrySet()) { - if (entry.getKey().isOnlyForAnalysis() == false) { - totalResourceConsumption += entry.getValue().getStatsInfo().get(taskStats).getTotalValue(); - } + final ResourceUsageInfo.ResourceStatsInfo statsInfo = threadResourceInfo.getResourceUsageInfo().getStatsInfo().get(stats); + if (threadResourceInfo.getStatsType().isOnlyForAnalysis() == false && statsInfo != null) { + totalResourceConsumption += statsInfo.getTotalValue(); } } } @@ -264,67 +273,67 @@ public long getTotalResourceUtilization(ResourceStats taskStats) { } /** - * Adds thread's resource consumption information + * Adds thread's starting resource consumption information * @param threadId ID of the thread * @param statsType stats type * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException matching active thread entry was found which is not expected. */ public void startThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { - if (statsType != ResourceStatsType.WORKER_STATS) { - throw new IllegalArgumentException("Adding thread resource information should always have WORKER_STATS as stats type"); - } final List threadResourceInfoList = resourceStats.computeIfAbsent(threadId, k -> new ArrayList<>()); - // active thread entry should not be present in the list. + // active thread entry should not be present in the list for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { - if (threadResourceInfo.isActive()) { - throw new IllegalStateException("Unexpected active thread entry is present"); + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + throw new IllegalStateException( + "unexpected active thread resource entry present [" + threadId + "]:[" + threadResourceInfo + "]" + ); } } - threadResourceInfoList.add(new ThreadResourceInfo(ResourceStatsType.WORKER_STATS, resourceUsageMetrics)); + threadResourceInfoList.add(new ThreadResourceInfo(statsType, resourceUsageMetrics)); } /** * This method is used to update the resource consumption stats so that the data isn't too stale for long-running task. - * If an active thread entry is not present in the list, the update is dropped. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. * @param threadId ID of the thread * @param statsType stats type * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. */ public void updateThreadResourceStats(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { final List threadResourceInfoList = resourceStats.get(threadId); - if (threadResourceInfoList == null) { - throw new IllegalStateException("Cannot update if thread resource info is not present"); - } else { - // If active entry is not present, the update is dropped. If present, the active entry is updated. + if (threadResourceInfoList != null) { for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { - if (threadResourceInfo.isActive()) { - threadResourceInfo.updateResourceInfo(statsType, resourceUsageMetrics); + // the active entry present in the list is updated + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); return; } } } + throw new IllegalStateException("cannot update if active thread resource entry is not present"); } /** * Record the thread's final resource consumption values. + * If active thread entry is present in the list, the entry is updated. If one is not found, it throws an exception. * @param threadId ID of the thread * @param statsType stats type * @param resourceUsageMetrics resource consumption metrics of the thread + * @throws IllegalStateException if no matching active thread entry was found. */ public void stopThreadResourceTracking(long threadId, ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { final List threadResourceInfoList = resourceStats.get(threadId); - if (statsType != ResourceStatsType.WORKER_STATS || threadResourceInfoList == null) { - throw new IllegalArgumentException( - "Recording the end should have WORKER_STATS as stats type" + "and an active entry should be present in the list" - ); - } - // marking active entries as done before updating the final resource usage values. - for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { - if (threadResourceInfo.isActive()) { - threadResourceInfo.setActive(false); - threadResourceInfo.updateResourceInfo(ResourceStatsType.WORKER_STATS, resourceUsageMetrics); + if (threadResourceInfoList != null) { + for (ThreadResourceInfo threadResourceInfo : threadResourceInfoList) { + if (threadResourceInfo.getStatsType() == statsType && threadResourceInfo.isActive()) { + threadResourceInfo.setActive(false); + threadResourceInfo.recordResourceUsageMetrics(resourceUsageMetrics); + return; + } } } + throw new IllegalStateException("cannot update final values if active thread resource entry is not present"); } /** diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java b/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java index 1a31af29d1442..6af3de2b78c06 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceUsage.java @@ -30,6 +30,9 @@ * information of running tasks. */ public class TaskResourceUsage implements Writeable, ToXContentFragment { + private static final ParseField CPU_TIME_IN_NANOS = new ParseField("cpu_time_in_nanos"); + private static final ParseField MEMORY_IN_BYTES = new ParseField("memory_in_bytes"); + private final long cpuTimeInNanos; private final long memoryInBytes; @@ -61,8 +64,8 @@ public long getMemoryInBytes() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.field(ResourceStats.CPU.toString(), cpuTimeInNanos); - builder.field(ResourceStats.MEMORY.toString(), memoryInBytes); + builder.field(CPU_TIME_IN_NANOS.getPreferredName(), cpuTimeInNanos); + builder.field(MEMORY_IN_BYTES.getPreferredName(), memoryInBytes); return builder; } @@ -72,8 +75,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws ); static { - PARSER.declareLong(constructorArg(), new ParseField(ResourceStats.CPU.toString())); - PARSER.declareLong(constructorArg(), new ParseField(ResourceStats.MEMORY.toString())); + PARSER.declareLong(constructorArg(), CPU_TIME_IN_NANOS); + PARSER.declareLong(constructorArg(), MEMORY_IN_BYTES); } public static TaskResourceUsage fromXContent(XContentParser parser) { diff --git a/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java b/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java index 8c1f976e6b571..8b45c38c8fb63 100644 --- a/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java +++ b/server/src/main/java/org/opensearch/tasks/ThreadResourceInfo.java @@ -8,45 +8,47 @@ package org.opensearch.tasks; -import java.util.EnumMap; - /** * Resource consumption information about a particular execution of thread. *

- * It captures the resource usage information about a particular execution of thread. - * across different stats type like worker_stats or response_stats etc., + * It captures the resource usage information about a particular execution of thread + * for a specific stats type like worker_stats or response_stats etc., */ public class ThreadResourceInfo { - private final EnumMap resourceUsageInfos = new EnumMap<>(ResourceStatsType.class); private volatile boolean isActive = true; + private final ResourceStatsType statsType; + private final ResourceUsageInfo resourceUsageInfo; public ThreadResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { - this.resourceUsageInfos.put(statsType, new ResourceUsageInfo(resourceUsageMetrics)); + this.statsType = statsType; + this.resourceUsageInfo = new ResourceUsageInfo(resourceUsageMetrics); } - public void updateResourceInfo(ResourceStatsType statsType, ResourceUsageMetric... resourceUsageMetrics) { - ResourceUsageInfo resourceUsageInfo = resourceUsageInfos.get(statsType); - if (resourceUsageInfo == null) { - resourceUsageInfos.put(statsType, new ResourceUsageInfo()); - } else { - resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics); - } + /** + * Updates thread's resource consumption information. + */ + public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) { + resourceUsageInfo.recordResourceUsageMetrics(resourceUsageMetrics); } - public EnumMap getResourceUsageInfos() { - return resourceUsageInfos; + public void setActive(boolean isActive) { + this.isActive = isActive; } public boolean isActive() { return isActive; } - public boolean setActive(boolean active) { - return isActive = active; + public ResourceStatsType getStatsType() { + return statsType; + } + + public ResourceUsageInfo getResourceUsageInfo() { + return resourceUsageInfo; } @Override public String toString() { - return resourceUsageInfos + ", is_active=" + isActive; + return resourceUsageInfo + ", stats_type=" + statsType + ", is_active=" + isActive; } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java index 6f1ba438618d6..45db94577f15f 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java @@ -34,8 +34,6 @@ import org.opensearch.action.search.SearchAction; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.xcontent.XContentHelper; -import org.opensearch.tasks.TaskResourceStats; -import org.opensearch.tasks.TaskResourceUsage; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.tasks.TaskInfo; @@ -46,9 +44,10 @@ import java.nio.charset.StandardCharsets; import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import static org.opensearch.tasks.TaskInfoTests.randomResourceStats; + public class TaskTests extends OpenSearchTestCase { public void testTaskInfoToString() { @@ -58,7 +57,6 @@ public void testTaskInfoToString() { long runningTime = randomNonNegativeLong(); boolean cancellable = false; boolean cancelled = false; - TaskResourceStats resourceStats = randomResourceStats(); TaskInfo taskInfo = new TaskInfo( new TaskId(nodeId, taskId), "test_type", @@ -71,7 +69,7 @@ public void testTaskInfoToString() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - resourceStats + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -105,7 +103,7 @@ public void testCancellableOptionWhenCancelledTrue() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - randomResourceStats() + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -132,7 +130,7 @@ public void testCancellableOptionWhenCancelledFalse() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - randomResourceStats() + randomResourceStats(randomBoolean()) ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); @@ -161,7 +159,7 @@ public void testNonCancellableOption() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - randomResourceStats() + randomResourceStats(randomBoolean()) ) ); assertEquals(e.getMessage(), "task cannot be cancelled"); @@ -180,7 +178,7 @@ public void testTaskResourceStats() { long totalMemory = 0L; long totalCPU = 0L; - // reporting resource consumption events + // reporting resource consumption events and checking total consumption values for (int i = 0; i < randomInt(10); i++) { long initial_memory = randomLongBetween(1, 100); long initial_cpu = randomLongBetween(1, 100); @@ -199,17 +197,36 @@ public void testTaskResourceStats() { ResourceUsageMetric[] taskResourceMetrics = new ResourceUsageMetric[] { new ResourceUsageMetric(ResourceStats.MEMORY, memory), new ResourceUsageMetric(ResourceStats.CPU, cpu) }; - task.stopThreadResourceTracking(i, ResourceStatsType.WORKER_STATS, taskResourceMetrics); + task.updateThreadResourceStats(i, ResourceStatsType.WORKER_STATS, taskResourceMetrics); + task.stopThreadResourceTracking(i, ResourceStatsType.WORKER_STATS); } assertEquals(task.getTotalResourceStats().getMemoryInBytes(), totalMemory); assertEquals(task.getTotalResourceStats().getCpuTimeInNanos(), totalCPU); - } - private TaskResourceStats randomResourceStats() { - return false ? null : new TaskResourceStats(new HashMap() { - { - put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong())); - } - }); + // updating should throw an IllegalStateException when active entry is not present. + try { + task.updateThreadResourceStats(randomInt(), ResourceStatsType.WORKER_STATS); + fail("update should not be successful as active entry is not present!"); + } catch (IllegalStateException e) { + // pass + } + + // re-adding a thread entry that is already present, should throw an exception + int threadId = randomInt(); + task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.MEMORY, 100)); + try { + task.startThreadResourceTracking(threadId, ResourceStatsType.WORKER_STATS); + fail("add/start should not be successful as active entry is already present!"); + } catch (IllegalStateException e) { + // pass + } + + // existing active entry is present only for memory, update cannot be called with cpu values. + try { + task.updateThreadResourceStats(threadId, ResourceStatsType.WORKER_STATS, new ResourceUsageMetric(ResourceStats.CPU, 200)); + fail("update should not be successful as entry for CPU is not present!"); + } catch (IllegalStateException e) { + // pass + } } } diff --git a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java index 5ae9eb04f29fc..c0ec4ca3d31fd 100644 --- a/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/CancelTasksResponseTests.java @@ -135,4 +135,5 @@ private static CancelTasksResponse createTestInstanceWithFailures() { } return new CancelTasksResponse(randomTasks(), taskFailures, nodeFailures); } + } diff --git a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java index 06cf42dd09d6e..7c8cb3230659b 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java @@ -281,8 +281,8 @@ static TaskInfo randomTaskInfo(boolean detailed) { String action = randomAlphaOfLength(5); Task.Status status = detailed ? randomRawTaskStatus() : null; String description = detailed ? randomAlphaOfLength(5) : null; - long startTime = randomNonNegativeLong(); - long runningTimeNanos = randomNonNegativeLong(); + long startTime = randomLong(); + long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); boolean cancelled = cancellable == true ? randomBoolean() : false; TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); @@ -323,7 +323,7 @@ private static RawTaskStatus randomRawTaskStatus() { } } - private static TaskResourceStats randomResourceStats(boolean detailed) { + public static TaskResourceStats randomResourceStats(boolean detailed) { return detailed ? new TaskResourceStats(new HashMap() { { for (int i = 0; i < randomInt(5); i++) {