From c364bb3f5280c5c9c61f3606bae77161c8a3c81f Mon Sep 17 00:00:00 2001 From: Nik Everett Date: Thu, 9 Aug 2018 13:18:44 -0400 Subject: [PATCH] Logging: Use settings when building daemon threads Subclasses of `EsIntegTestCase` run multiple Elasticsearch nodes in the same JVM and when we log we look at the name of the thread to figure out the node name. This makes sure that all calls to `daemonThreadFactory` include the node name. Closes #32574 I'd like to follow this up with more drastic changes that make it impossible to do this incorrectly but that change is much larger than this and I'd like to get these log lines fixed up sooner rather than later. --- .../common/util/concurrent/EsExecutors.java | 2 ++ .../elasticsearch/indices/IndicesService.java | 2 +- .../elasticsearch/license/LicenseService.java | 2 +- .../xpack/core/scheduler/SchedulerEngine.java | 5 +++-- .../elasticsearch/xpack/rollup/Rollup.java | 2 +- .../xpack/rollup/job/RollupJobTaskTests.java | 21 ++++++++++--------- 6 files changed, 19 insertions(+), 15 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 057a970470b15..d38eb03fae3dd 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -160,11 +160,13 @@ public static String threadName(Settings settings, String namePrefix) { if (Node.NODE_NAME_SETTING.exists(settings)) { return threadName(Node.NODE_NAME_SETTING.get(settings), namePrefix); } else { + // TODO this should only be allowed in tests return threadName("", namePrefix); } } public static String threadName(final String nodeName, final String namePrefix) { + // TODO missing node names should only be allowed in tests return "elasticsearch" + (nodeName.isEmpty() ? "" : "[") + nodeName + (nodeName.isEmpty() ? "" : "]") + "[" + namePrefix + "]"; } diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 39346fecbef25..5c097ba774f4a 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -233,7 +233,7 @@ public void onRemoval(ShardId shardId, String fieldName, boolean wasEvicted, lon @Override protected void doStop() { - ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory("indices_shutdown")); + ExecutorService indicesStopExecutor = Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown")); // Copy indices because we modify it asynchronously in the body of the loop final Set indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java index d299406aad06c..29b1e6323bf4e 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java @@ -119,7 +119,7 @@ public LicenseService(Settings settings, ClusterService clusterService, Clock cl super(settings); this.clusterService = clusterService; this.clock = clock; - this.scheduler = new SchedulerEngine(clock); + this.scheduler = new SchedulerEngine(settings, clock); this.licenseState = licenseState; this.operationModeFileWatcher = new OperationModeFileWatcher(resourceWatcherService, XPackPlugin.resolveConfigFile(env, "license_mode"), logger, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java index e405c5e4e007e..ffc0257313b36 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/scheduler/SchedulerEngine.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.scheduler; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.FutureUtils; @@ -92,9 +93,9 @@ public interface Schedule { private final Clock clock; private final List listeners = new CopyOnWriteArrayList<>(); - public SchedulerEngine(Clock clock) { + public SchedulerEngine(Settings settings, Clock clock) { this.clock = clock; - this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory("trigger_engine_scheduler")); + this.scheduler = Executors.newScheduledThreadPool(1, EsExecutors.daemonThreadFactory(settings, "trigger_engine_scheduler")); } public void register(Listener listener) { diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java index 0fc4d838f7ce8..09b2ccd079a76 100644 --- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java +++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/Rollup.java @@ -194,7 +194,7 @@ public List> getPersistentTasksExecutor(ClusterServic return emptyList(); } - SchedulerEngine schedulerEngine = new SchedulerEngine(getClock()); + SchedulerEngine schedulerEngine = new SchedulerEngine(settings, getClock()); return Collections.singletonList(new RollupJobTask.RollupJobPersistentTasksExecutor(settings, client, schedulerEngine, threadPool)); } diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java index 13290f09e8eb8..66943dd9d2fa1 100644 --- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java +++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/job/RollupJobTaskTests.java @@ -47,6 +47,7 @@ public class RollupJobTaskTests extends ESTestCase { + private static final Settings SETTINGS = Settings.builder().put("node_name", "test").build(); private static ThreadPool pool = new TestThreadPool("test"); @AfterClass @@ -62,7 +63,7 @@ public void testInitialStatusStopped() { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -75,7 +76,7 @@ public void testInitialStatusAborting() { RollupJobStatus status = new RollupJobStatus(IndexerState.ABORTING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -88,7 +89,7 @@ public void testInitialStatusStopping() { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPING, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -101,7 +102,7 @@ public void testInitialStatusStarted() { RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -114,7 +115,7 @@ public void testInitialStatusIndexingOldID() { RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), false); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -128,7 +129,7 @@ public void testInitialStatusIndexingNewID() { RollupJobStatus status = new RollupJobStatus(IndexerState.INDEXING, Collections.singletonMap("foo", "bar"), true); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -141,7 +142,7 @@ public void testNoInitialStatus() { RollupJob job = new RollupJob(ConfigTestHelpers.randomRollupJobConfig(random()), Collections.emptyMap()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, null, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -154,7 +155,7 @@ public void testStartWhenStarted() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STARTED, Collections.singletonMap("foo", "bar"), randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STARTED)); @@ -641,7 +642,7 @@ public void testStopWhenStopped() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); RollupJobTask task = new RollupJobTask(1, "type", "action", new TaskId("node", 123), job, status, client, schedulerEngine, pool, Collections.emptyMap()); assertThat(((RollupJobStatus)task.getStatus()).getIndexerState(), equalTo(IndexerState.STOPPED)); @@ -748,7 +749,7 @@ public void testStopWhenAborting() throws InterruptedException { RollupJobStatus status = new RollupJobStatus(IndexerState.STOPPED, null, randomBoolean()); Client client = mock(Client.class); when(client.settings()).thenReturn(Settings.EMPTY); - SchedulerEngine schedulerEngine = new SchedulerEngine(Clock.systemUTC()); + SchedulerEngine schedulerEngine = new SchedulerEngine(SETTINGS, Clock.systemUTC()); CountDownLatch latch = new CountDownLatch(2);