Skip to content

Commit

Permalink
De-mockify history store tests
Browse files Browse the repository at this point in the history
  • Loading branch information
gwbrown committed May 7, 2019
1 parent 38deb63 commit ea8b159
Showing 1 changed file with 84 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,55 +6,71 @@

package org.elasticsearch.xpack.core.snapshotlifecycle.history;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleType;
import org.elasticsearch.xpack.core.indexlifecycle.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.snapshotlifecycle.SnapshotLifecyclePolicy;
import org.junit.After;
import org.junit.Before;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotHistoryStore.getHistoryIndexNameForTime;
import static org.elasticsearch.xpack.core.snapshotlifecycle.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class SnapshotHistoryStoreTests extends ESTestCase {

private ThreadPool threadPool;
private ClusterService clusterService;
private Client client;
private SnapshotLifecycleTemplateRegistryTests.VerifyingClient client;
private SnapshotHistoryStore historyStore;
private SnapshotLifecycleTemplateRegistry registry;

@Before
public void setup() {
Settings settings = Settings.builder().put("node.name", randomAlphaOfLength(10)).build();
client = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class);
when(client.threadPool()).thenReturn(threadPool);
when(client.settings()).thenReturn(settings);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(settings));
clusterService = mock(ClusterService.class);
registry = mock(SnapshotLifecycleTemplateRegistry.class);

historyStore = new SnapshotHistoryStore(settings, client, ZoneOffset.UTC, clusterService, registry);
threadPool = new TestThreadPool(this.getClass().getName());
client = new SnapshotLifecycleTemplateRegistryTests.VerifyingClient(threadPool);
clusterService = ClusterServiceUtils.createClusterService(threadPool);
List<NamedXContentRegistry.Entry> entries = new ArrayList<>(ClusterModule.getNamedXWriteables());
entries.addAll(Arrays.asList(
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
(p) -> TimeseriesLifecycleType.INSTANCE),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)));
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(entries);
registry = new SnapshotLifecycleTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, xContentRegistry);

historyStore = new SnapshotHistoryStore(Settings.EMPTY, client, ZoneOffset.UTC, clusterService, registry);
}

@After
@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testNoActionIfDisabled() {
Expand All @@ -67,12 +83,15 @@ public void testNoActionIfDisabled() {
String snapshotId = policy.generateSnapshotName(context);
SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId);

client.setVerifier((a,r,l) -> {
fail("the history store is disabled, no action should have been taken");
return null;
});
disabledHistoryStore.putAsync(record);
verify(client, times(0)).index(any(), any());
}

@SuppressWarnings("unchecked")
public void testPut() throws IOException {
public void testPut() throws Exception {
String policyId = randomAlphaOfLength(5);
SnapshotLifecyclePolicy policy = randomSnapshotLifecyclePolicy(policyId);
final long timestamp = randomNonNegativeLong();
Expand All @@ -81,40 +100,55 @@ public void testPut() throws IOException {
{
SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId);

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
calledTimes.incrementAndGet();
assertThat(action, instanceOf(IndexAction.class));
assertThat(request, instanceOf(IndexRequest.class));
IndexRequest indexRequest = (IndexRequest) request;
assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index());
final String indexedDocument = indexRequest.source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository()));
assertThat(indexedDocument, containsString(snapshotId));
if (policy.getConfig() != null) {
assertContainsMap(indexedDocument, policy.getConfig());
}
assertNotNull(listener);
return new IndexResponse();
});

historyStore.putAsync(record);
ArgumentCaptor<IndexRequest> indexRequest = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, times(1)).index(indexRequest.capture(), notNull(ActionListener.class));

assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)),
indexRequest.getValue().index());
final String indexedDocument = indexRequest.getValue().source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository()));
assertThat(indexedDocument, containsString(snapshotId));
if (policy.getConfig() != null) {
assertContainsMap(indexedDocument, policy.getConfig());
}
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}

{
final String cause = randomAlphaOfLength(9);
Exception failureException = new RuntimeException(cause);
SnapshotHistoryItem record = SnapshotHistoryItem.failureRecord(timestamp, policy, snapshotId, failureException);

AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> {
calledTimes.incrementAndGet();
assertThat(action, instanceOf(IndexAction.class));
assertThat(request, instanceOf(IndexRequest.class));
IndexRequest indexRequest = (IndexRequest) request;
assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index());
final String indexedDocument = indexRequest.source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository()));
assertThat(indexedDocument, containsString(snapshotId));
if (policy.getConfig() != null) {
assertContainsMap(indexedDocument, policy.getConfig());
}
assertThat(indexedDocument, containsString("runtime_exception"));
assertThat(indexedDocument, containsString(cause));
assertNotNull(listener);
return new IndexResponse();
});

historyStore.putAsync(record);
ArgumentCaptor<IndexRequest> indexRequest = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, times(2)).index(indexRequest.capture(), notNull(ActionListener.class));

assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)),
indexRequest.getValue().index());
final String indexedDocument = indexRequest.getValue().source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository()));
assertThat(indexedDocument, containsString(snapshotId));
if (policy.getConfig() != null) {
assertContainsMap(indexedDocument, policy.getConfig());
}
assertThat(indexedDocument, containsString("runtime_exception"));
assertThat(indexedDocument, containsString(cause));
assertBusy(() -> assertThat(calledTimes.get(), equalTo(1)));
}
}

Expand Down

0 comments on commit ea8b159

Please sign in to comment.