diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 057a970470b15..d38eb03fae3dd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -160,11 +160,13 @@ public static String threadName(Settings settings, String namePrefix) { if (Node.NODE_NAME_SETTING.exists(settings)) { return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix); } else { + // TODO this should only be allowed in tests return threadName("", namePrefix); } } public static String threadName(final String nodeName, final String namePrefix) { + // TODO missing node names should only be allowed in tests return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]"; } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 39346fecbef25..5c097ba774f4a 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -233,7 +233,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon @Override protected void doStop() { - ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown")); + ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown")); // Copy indices because we modify it asynchronously in the body of the loop final Set indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java index d299406aad06c..29b1e6323bf4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java @@ -119,7 +119,7 @@ public LicenseService(Settings settings, ClusterService clusterService, Clock cl super(settings); this.clusterService = clusterService; this.clock = clock; - this.scheduler = new SchedulerEngine(clock); + this.scheduler = new SchedulerEngine(settings, clock); this.licenseState = licenseState; this.operationModeFileWatcher = new OperationModeFileWatcher(resourceWatcherService, XPackPlugin.resolveConfigFile(env, "license_mode"), logger, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index e405c5e4e007e..ffc0257313b36 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.scheduler; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -92,9 +93,9 @@ public interface Schedule { private final Clock clock; private final List listeners = new CopyOnWriteArrayList<>(); - public SchedulerEngine(Clock clock) { + public SchedulerEngine(Settings settings, Clock clock) { this.clock = clock; - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); } public void register(Listener listener) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 0fc4d838f7ce8..09b2ccd079a76 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -194,7 +194,7 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(getClock()); + SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock()); return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool)); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 13290f09e8eb8..66943dd9d2fa1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -47,6 +47,7 @@ public class RollupJobTaskTests extends ESTestCase { + private static final Settings SETTINGS = Settings.builder().put("node_name", "test").build(); private static ThreadPool pool = new TestThreadPool("test"); @AfterClass @@ -62,7 +63,7 @@ public void testInitialStatusStopped() { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -75,7 +76,7 @@ public void testInitialStatusAborting() { RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -88,7 +89,7 @@ public void testInitialStatusStopping() { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -101,7 +102,7 @@ public void testInitialStatusStarted() { RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -114,7 +115,7 @@ public void testInitialStatusIndexingOldID() { RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -128,7 +129,7 @@ public void testInitialStatusIndexingNewID() { RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -141,7 +142,7 @@ public void testNoInitialStatus() { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -154,7 +155,7 @@ public void testStartWhenStarted() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -641,7 +642,7 @@ public void testStopWhenStopped() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -748,7 +749,7 @@ public void testStopWhenAborting() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); CountDownLatch latch = new CountDownLatch(2);