Skip to content

Commit

Permalink
Send client headers from TransportClient (#30803)
Browse files Browse the repository at this point in the history
This change adds a simple header to the transport client
that is present on the servers thread context that ensures
we can detect if a transport client talks to the server in a
specific request. This change also adds a header for xpack
to detect if the client has xpack installed.
  • Loading branch information
s1monw authored May 24, 2018
1 parent 2c7559c commit 0bdfb5c
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.client.transport;

import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -129,7 +130,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
providedSettings = Settings.builder().put(providedSettings).put(Node.NODE_NAME_SETTING.getKey(), "_client_").build();
}
final PluginsService pluginsService = newPluginService(providedSettings, plugins);
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).build();
final Settings settings = Settings.builder().put(defaultSettings).put(pluginsService.updatedSettings()).put(ThreadContext.PREFIX
+ "." + "transport_client", true).build();
final List<Closeable> resourcesToClose = new ArrayList<>();
final ThreadPool threadPool = new ThreadPool(settings);
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,7 @@ public final void messageReceived(BytesReference reference, TcpChannel channel)
streamIn = new NamedWriteableAwareStreamInput(streamIn, namedWriteableRegistry);
streamIn.setVersion(version);
threadPool.getThreadContext().readHeaders(streamIn);
threadPool.getThreadContext().putTransient("_remote_address", remoteAddress);
if (TransportStatus.isRequest(status)) {
handleRequest(channel, profileName, streamIn, requestId, messageLengthBytes, version, remoteAddress, status);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,15 @@ public void testOverrideHeader() throws Exception {

protected static void assertHeaders(Map<String, String> headers, Map<String, String> expected) {
assertNotNull(headers);
headers = new HashMap<>(headers);
headers.remove("transport_client"); // default header on TPC
assertEquals(expected.size(), headers.size());
for (Map.Entry<String, String> expectedEntry : expected.entrySet()) {
assertEquals(headers.get(expectedEntry.getKey()), expectedEntry.getValue());
}
}

protected static void assertHeaders(ThreadPool pool) {
Map<String, String> headers = new HashMap<>();
Settings asSettings = HEADER_SETTINGS.getAsSettings(ThreadContext.PREFIX);
assertHeaders(pool.getThreadContext().getHeaders(),
asSettings.keySet().stream().collect(Collectors.toMap(Function.identity(), k -> asSettings.get(k))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -63,13 +64,29 @@ public void testPluginNamedWriteablesRegistered() {
}
}

public void testDefaultHeaderContainsPlugins() {
Settings baseSettings = Settings.builder()
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir())
.build();
try (TransportClient client = new MockTransportClient(baseSettings, Arrays.asList(MockPlugin.class))) {
ThreadContext threadContext = client.threadPool().getThreadContext();
assertEquals("true", threadContext.getHeader("transport_client"));
assertEquals("true", threadContext.getHeader("test"));
}
}

public static class MockPlugin extends Plugin {

@Override
public List<Entry> getNamedWriteables() {
return Arrays.asList(new Entry[]{ new Entry(MockNamedWriteable.class, MockNamedWriteable.NAME, MockNamedWriteable::new)});
}

@Override
public Settings additionalSettings() {
return Settings.builder().put(ThreadContext.PREFIX + "." + "test", true).build();
}

public class MockNamedWriteable implements NamedWriteable {

static final String NAME = "mockNamedWritable";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.license.DeleteLicenseAction;
Expand Down Expand Up @@ -193,6 +194,7 @@ static Settings additionalSettings(final Settings settings, final boolean enable
final Settings.Builder builder = Settings.builder();
builder.put(SecuritySettings.addTransportSettings(settings));
builder.put(SecuritySettings.addUserSettings(settings));
builder.put(ThreadContext.PREFIX + "." + "has_xpack", true);
return builder.build();
} else {
return Settings.EMPTY;
Expand Down

0 comments on commit 0bdfb5c

Please sign in to comment.