diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index 4eaf71f9c689d..cfddb88b87d93 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -161,10 +161,10 @@ public String toString() { public static final class Response extends ActionResponse { - private long indexMetadataVersion; + private long mappingVersion; - public long getIndexMetadataVersion() { - return indexMetadataVersion; + public long getMappingVersion() { + return mappingVersion; } private long globalCheckpoint; @@ -188,8 +188,8 @@ public Translog.Operation[] getOperations() { Response() { } - Response(final long indexMetadataVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { - this.indexMetadataVersion = indexMetadataVersion; + Response(final long mappingVersion, final long globalCheckpoint, final long maxSeqNo, final Translog.Operation[] operations) { + this.mappingVersion = mappingVersion; this.globalCheckpoint = globalCheckpoint; this.maxSeqNo = maxSeqNo; this.operations = operations; @@ -198,7 +198,7 @@ public Translog.Operation[] getOperations() { @Override public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); - indexMetadataVersion = in.readVLong(); + mappingVersion = in.readVLong(); globalCheckpoint = in.readZLong(); maxSeqNo = in.readZLong(); operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new); @@ -207,7 +207,7 @@ public void readFrom(final StreamInput in) throws IOException { @Override public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); - out.writeVLong(indexMetadataVersion); + out.writeVLong(mappingVersion); out.writeZLong(globalCheckpoint); out.writeZLong(maxSeqNo); out.writeArray(Translog.Operation::writeOperation, operations); @@ -218,7 +218,7 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; final Response that = (Response) o; - return indexMetadataVersion == that.indexMetadataVersion && + return mappingVersion == that.mappingVersion && globalCheckpoint == that.globalCheckpoint && maxSeqNo == that.maxSeqNo && Arrays.equals(operations, that.operations); @@ -226,7 +226,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(indexMetadataVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); + return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations)); } } @@ -252,7 +252,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); IndexShard indexShard = indexService.getShard(request.getShard().id()); final SeqNoStats seqNoStats = indexShard.seqNoStats(); - final long indexMetaDataVersion = clusterService.state().metaData().index(shardId.getIndex()).getVersion(); + final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); final Translog.Operation[] operations = getOperations( indexShard, @@ -260,7 +260,7 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc request.fromSeqNo, request.maxOperationCount, request.maxOperationSizeInBytes); - return new Response(indexMetaDataVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); + return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java index f2b5b7b3772d2..6854a9f5741b7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java @@ -80,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask { private long followerMaxSeqNo = 0; private int numConcurrentReads = 0; private int numConcurrentWrites = 0; - private long currentIndexMetadataVersion = 0; + private long currentMappingVersion = 0; private long totalFetchTimeMillis = 0; private long numberOfSuccessfulFetches = 0; private long numberOfFailedFetches = 0; @@ -131,14 +131,13 @@ void start( this.lastRequestedSeqNo = followerGlobalCheckpoint; } - // Forcefully updates follower mapping, this gets us the leader imd version and - // makes sure that leader and follower mapping are identical. - updateMapping(imdVersion -> { + // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical + updateMapping(mappingVersion -> { synchronized (ShardFollowNodeTask.this) { - currentIndexMetadataVersion = imdVersion; + currentMappingVersion = mappingVersion; } - LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion={}", - params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, imdVersion); + LOGGER.info("{} Started to follow leader shard {}, followGlobalCheckPoint={}, mappingVersion={}", + params.getFollowShardId(), params.getLeaderShardId(), followerGlobalCheckpoint, mappingVersion); coordinateReads(); }); } @@ -258,7 +257,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR } void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Response response) { - maybeUpdateMapping(response.getIndexMetadataVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); + maybeUpdateMapping(response.getMappingVersion(), () -> innerHandleReadResponse(from, maxRequiredSeqNo, response)); } /** Called when some operations are fetched from the leading */ @@ -344,16 +343,16 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse coordinateReads(); } - private synchronized void maybeUpdateMapping(Long minimumRequiredIndexMetadataVersion, Runnable task) { - if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion) { - LOGGER.trace("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]", - params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); + private synchronized void maybeUpdateMapping(Long minimumRequiredMappingVersion, Runnable task) { + if (currentMappingVersion >= minimumRequiredMappingVersion) { + LOGGER.trace("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]", + params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); task.run(); } else { - LOGGER.trace("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]", - params.getFollowShardId(), currentIndexMetadataVersion, minimumRequiredIndexMetadataVersion); - updateMapping(imdVersion -> { - currentIndexMetadataVersion = imdVersion; + LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]", + params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion); + updateMapping(mappingVersion -> { + currentMappingVersion = mappingVersion; task.run(); }); } @@ -422,7 +421,7 @@ public synchronized Status getStatus() { numConcurrentReads, numConcurrentWrites, buffer.size(), - currentIndexMetadataVersion, + currentMappingVersion, totalFetchTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, @@ -448,7 +447,7 @@ public static class Status implements Task.Status { static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField("number_of_concurrent_reads"); static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField("number_of_concurrent_writes"); static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField("number_of_queued_writes"); - static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField("index_metadata_version"); + static final ParseField MAPPING_VERSION_FIELD = new ParseField("mapping_version"); static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField("total_fetch_time_millis"); static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField("number_of_successful_fetches"); static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField("number_of_failed_fetches"); @@ -504,7 +503,7 @@ public static class Status implements Task.Status { STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_READS_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_CONCURRENT_WRITES_FIELD); STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), NUMBER_OF_QUEUED_WRITES_FIELD); - STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), INDEX_METADATA_VERSION_FIELD); + STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), MAPPING_VERSION_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), TOTAL_FETCH_TIME_MILLIS_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD); STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), NUMBER_OF_FAILED_FETCHES_FIELD); @@ -582,10 +581,10 @@ public int numberOfQueuedWrites() { return numberOfQueuedWrites; } - private final long indexMetadataVersion; + private final long mappingVersion; - public long indexMetadataVersion() { - return indexMetadataVersion; + public long mappingVersion() { + return mappingVersion; } private final long totalFetchTimeMillis; @@ -658,7 +657,7 @@ public NavigableMap fetchExceptions() { final int numberOfConcurrentReads, final int numberOfConcurrentWrites, final int numberOfQueuedWrites, - final long indexMetadataVersion, + final long mappingVersion, final long totalFetchTimeMillis, final long numberOfSuccessfulFetches, final long numberOfFailedFetches, @@ -678,7 +677,7 @@ public NavigableMap fetchExceptions() { this.numberOfConcurrentReads = numberOfConcurrentReads; this.numberOfConcurrentWrites = numberOfConcurrentWrites; this.numberOfQueuedWrites = numberOfQueuedWrites; - this.indexMetadataVersion = indexMetadataVersion; + this.mappingVersion = mappingVersion; this.totalFetchTimeMillis = totalFetchTimeMillis; this.numberOfSuccessfulFetches = numberOfSuccessfulFetches; this.numberOfFailedFetches = numberOfFailedFetches; @@ -701,7 +700,7 @@ public Status(final StreamInput in) throws IOException { this.numberOfConcurrentReads = in.readVInt(); this.numberOfConcurrentWrites = in.readVInt(); this.numberOfQueuedWrites = in.readVInt(); - this.indexMetadataVersion = in.readVLong(); + this.mappingVersion = in.readVLong(); this.totalFetchTimeMillis = in.readVLong(); this.numberOfSuccessfulFetches = in.readVLong(); this.numberOfFailedFetches = in.readVLong(); @@ -730,7 +729,7 @@ public void writeTo(final StreamOutput out) throws IOException { out.writeVInt(numberOfConcurrentReads); out.writeVInt(numberOfConcurrentWrites); out.writeVInt(numberOfQueuedWrites); - out.writeVLong(indexMetadataVersion); + out.writeVLong(mappingVersion); out.writeVLong(totalFetchTimeMillis); out.writeVLong(numberOfSuccessfulFetches); out.writeVLong(numberOfFailedFetches); @@ -756,7 +755,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa builder.field(NUMBER_OF_CONCURRENT_READS_FIELD.getPreferredName(), numberOfConcurrentReads); builder.field(NUMBER_OF_CONCURRENT_WRITES_FIELD.getPreferredName(), numberOfConcurrentWrites); builder.field(NUMBER_OF_QUEUED_WRITES_FIELD.getPreferredName(), numberOfQueuedWrites); - builder.field(INDEX_METADATA_VERSION_FIELD.getPreferredName(), indexMetadataVersion); + builder.field(MAPPING_VERSION_FIELD.getPreferredName(), mappingVersion); builder.humanReadableField( TOTAL_FETCH_TIME_MILLIS_FIELD.getPreferredName(), "total_fetch_time", @@ -815,7 +814,7 @@ public boolean equals(final Object o) { numberOfConcurrentReads == that.numberOfConcurrentReads && numberOfConcurrentWrites == that.numberOfConcurrentWrites && numberOfQueuedWrites == that.numberOfQueuedWrites && - indexMetadataVersion == that.indexMetadataVersion && + mappingVersion == that.mappingVersion && totalFetchTimeMillis == that.totalFetchTimeMillis && numberOfSuccessfulFetches == that.numberOfSuccessfulFetches && numberOfFailedFetches == that.numberOfFailedFetches && @@ -837,7 +836,7 @@ public int hashCode() { numberOfConcurrentReads, numberOfConcurrentWrites, numberOfQueuedWrites, - indexMetadataVersion, + mappingVersion, totalFetchTimeMillis, numberOfSuccessfulFetches, numberOfFailedFetches, diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index 34ba3a2e5c691..83e3e4806e184 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -115,7 +115,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro putMappingRequest.type(mappingMetaData.type()); putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON); followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap( - putMappingResponse -> handler.accept(indexMetaData.getVersion()), + putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()), errorHandler)); }, errorHandler)); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java index 8e150b8f934e4..e9c67097d72b2 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesResponseTests.java @@ -12,7 +12,7 @@ public class ShardChangesResponseTests extends AbstractStreamableTestCase fromToSlot = new HashMap<>(); @Override protected void innerUpdateMapping(LongConsumer handler, Consumer errorHandler) { - handler.accept(indexMetadataVersion); + handler.accept(mappingVersion); } @Override @@ -134,7 +134,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co fromToSlot.put(from, ++slot); // if too many invocations occur with the same from then AOBE occurs, this ok and then something is wrong. } - indexMetadataVersion = testResponse.indexMetadataVersion; + mappingVersion = testResponse.mappingVersion; if (testResponse.exception != null) { errorHandler.accept(testResponse.exception); } else { @@ -187,15 +187,15 @@ private void tearDown() { }; } - private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVersion, int maxOperationCount) { + private static TestRun createTestRun(long startSeqNo, long startMappingVersion, int maxOperationCount) { long prevGlobalCheckpoint = startSeqNo; - long indexMetaDataVersion = startIndexMetadataVersion; + long mappingVersion = startMappingVersion; int numResponses = randomIntBetween(16, 256); Map> responses = new HashMap<>(numResponses); for (int i = 0; i < numResponses; i++) { long nextGlobalCheckPoint = prevGlobalCheckpoint + maxOperationCount; if (sometimes()) { - indexMetaDataVersion++; + mappingVersion++; } if (sometimes()) { @@ -203,7 +203,7 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, indexMetaDataVersion, null)); + item.add(new TestResponse(error, mappingVersion, null)); } List ops = new ArrayList<>(); for (long seqNo = prevGlobalCheckpoint; seqNo <= nextGlobalCheckPoint; seqNo++) { @@ -211,8 +211,8 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer byte[] source = "{}".getBytes(StandardCharsets.UTF_8); ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } - item.add(new TestResponse(null, indexMetaDataVersion, - new ShardChangesAction.Response(indexMetaDataVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); + item.add(new TestResponse(null, mappingVersion, + new ShardChangesAction.Response(mappingVersion, nextGlobalCheckPoint, nextGlobalCheckPoint, ops.toArray(EMPTY)))); responses.put(prevGlobalCheckpoint, item); } else { // Simulates a leader shard copy not having all the operations the shard follow task thinks it has by @@ -224,13 +224,13 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Sometimes add a random retryable error if (sometimes()) { Exception error = new UnavailableShardsException(new ShardId("test", "test", 0), ""); - item.add(new TestResponse(error, indexMetaDataVersion, null)); + item.add(new TestResponse(error, mappingVersion, null)); } // Sometimes add an empty shard changes response to also simulate a leader shard lagging behind if (sometimes()) { ShardChangesAction.Response response = - new ShardChangesAction.Response(indexMetaDataVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); - item.add(new TestResponse(null, indexMetaDataVersion, response)); + new ShardChangesAction.Response(mappingVersion, prevGlobalCheckpoint, prevGlobalCheckpoint, EMPTY); + item.add(new TestResponse(null, mappingVersion, response)); } List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -241,14 +241,14 @@ private static TestRun createTestRun(long startSeqNo, long startIndexMetadataVer // Report toSeqNo to simulate maxBatchSizeInBytes limit being met or last op to simulate a shard lagging behind: long localLeaderGCP = randomBoolean() ? ops.get(ops.size() - 1).seqNo() : toSeqNo; ShardChangesAction.Response response = - new ShardChangesAction.Response(indexMetaDataVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); - item.add(new TestResponse(null, indexMetaDataVersion, response)); + new ShardChangesAction.Response(mappingVersion, localLeaderGCP, localLeaderGCP, ops.toArray(EMPTY)); + item.add(new TestResponse(null, mappingVersion, response)); responses.put(fromSeqNo, Collections.unmodifiableList(item)); } } prevGlobalCheckpoint = nextGlobalCheckPoint + 1; } - return new TestRun(maxOperationCount, startSeqNo, startIndexMetadataVersion, indexMetaDataVersion, + return new TestRun(maxOperationCount, startSeqNo, startMappingVersion, mappingVersion, prevGlobalCheckpoint - 1, responses); } @@ -261,18 +261,18 @@ private static class TestRun { final int maxOperationCount; final long startSeqNo; - final long startIndexMetadataVersion; + final long startMappingVersion; - final long finalIndexMetaDataVerion; + final long finalMappingVersion; final long finalExpectedGlobalCheckpoint; final Map> responses; - private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataVersion, long finalIndexMetaDataVerion, + private TestRun(int maxOperationCount, long startSeqNo, long startMappingVersion, long finalMappingVersion, long finalExpectedGlobalCheckpoint, Map> responses) { this.maxOperationCount = maxOperationCount; this.startSeqNo = startSeqNo; - this.startIndexMetadataVersion = startIndexMetadataVersion; - this.finalIndexMetaDataVerion = finalIndexMetaDataVerion; + this.startMappingVersion = startMappingVersion; + this.finalMappingVersion = finalMappingVersion; this.finalExpectedGlobalCheckpoint = finalExpectedGlobalCheckpoint; this.responses = Collections.unmodifiableMap(responses); } @@ -281,12 +281,12 @@ private TestRun(int maxOperationCount, long startSeqNo, long startIndexMetadataV private static class TestResponse { final Exception exception; - final long indexMetadataVersion; + final long mappingVersion; final ShardChangesAction.Response response; - private TestResponse(Exception exception, long indexMetadataVersion, ShardChangesAction.Response response) { + private TestResponse(Exception exception, long mappingVersion, ShardChangesAction.Response response) { this.exception = exception; - this.indexMetadataVersion = indexMetadataVersion; + this.mappingVersion = mappingVersion; this.response = response; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java index 4eb4283091959..234b7334e64fb 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java @@ -65,7 +65,7 @@ protected void assertEqualInstances(final ShardFollowNodeTask.Status expectedIns assertThat(newInstance.numberOfConcurrentReads(), equalTo(expectedInstance.numberOfConcurrentReads())); assertThat(newInstance.numberOfConcurrentWrites(), equalTo(expectedInstance.numberOfConcurrentWrites())); assertThat(newInstance.numberOfQueuedWrites(), equalTo(expectedInstance.numberOfQueuedWrites())); - assertThat(newInstance.indexMetadataVersion(), equalTo(expectedInstance.indexMetadataVersion())); + assertThat(newInstance.mappingVersion(), equalTo(expectedInstance.mappingVersion())); assertThat(newInstance.totalFetchTimeMillis(), equalTo(expectedInstance.totalFetchTimeMillis())); assertThat(newInstance.numberOfSuccessfulFetches(), equalTo(expectedInstance.numberOfSuccessfulFetches())); assertThat(newInstance.numberOfFailedFetches(), equalTo(expectedInstance.numberOfFailedFetches())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java index 54aef6bd3d116..4f7c0bf16645c 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java @@ -51,7 +51,7 @@ public class ShardFollowNodeTaskTests extends ESTestCase { private Queue readFailures; private Queue writeFailures; private Queue mappingUpdateFailures; - private Queue imdVersions; + private Queue mappingVersions; private Queue leaderGlobalCheckpoints; private Queue followerGlobalCheckpoints; private Queue maxSeqNos; @@ -180,7 +180,7 @@ public void testReceiveRetryableError() { for (int i = 0; i < max; i++) { readFailures.add(new ShardNotFoundException(new ShardId("leader_index", "", 0))); } - imdVersions.add(1L); + mappingVersions.add(1L); leaderGlobalCheckpoints.add(63L); maxSeqNos.add(63L); simulateResponse.set(true); @@ -327,7 +327,7 @@ public void testHandleReadResponse() { assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); @@ -433,7 +433,7 @@ public void testMappingUpdate() { ShardFollowNodeTask task = createShardFollowTask(64, 1, 1, Integer.MAX_VALUE, Long.MAX_VALUE); startTask(task, 63, -1); - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); task.handleReadResponse(0L, 63L, response); @@ -442,7 +442,7 @@ public void testMappingUpdate() { assertThat(bulkShardOperationRequests.get(0), equalTo(Arrays.asList(response.getOperations()))); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(1L)); + assertThat(status.mappingVersion(), equalTo(1L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -458,7 +458,7 @@ public void testMappingUpdateRetryableError() { for (int i = 0; i < max; i++) { mappingUpdateFailures.add(new ConnectException()); } - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 63, 1L, 63L); task.handleReadResponse(0L, 63L, response); @@ -467,7 +467,7 @@ public void testMappingUpdateRetryableError() { assertThat(bulkShardOperationRequests.size(), equalTo(1)); assertThat(task.isStopped(), equalTo(false)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(1L)); + assertThat(status.mappingVersion(), equalTo(1L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(1)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -483,17 +483,17 @@ public void testMappingUpdateRetryableErrorRetriedTooManyTimes() { for (int i = 0; i < max; i++) { mappingUpdateFailures.add(new ConnectException()); } - imdVersions.add(1L); + mappingVersions.add(1L); task.coordinateReads(); ShardChangesAction.Response response = generateShardChangesResponse(0, 64, 1L, 64L); task.handleReadResponse(0L, 64L, response); assertThat(mappingUpdateFailures.size(), equalTo(max - 11)); - assertThat(imdVersions.size(), equalTo(1)); + assertThat(mappingVersions.size(), equalTo(1)); assertThat(bulkShardOperationRequests.size(), equalTo(0)); assertThat(task.isStopped(), equalTo(true)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -512,7 +512,7 @@ public void testMappingUpdateNonRetryableError() { assertThat(bulkShardOperationRequests.size(), equalTo(0)); assertThat(task.isStopped(), equalTo(true)); ShardFollowNodeTask.Status status = task.getStatus(); - assertThat(status.indexMetadataVersion(), equalTo(0L)); + assertThat(status.mappingVersion(), equalTo(0L)); assertThat(status.numberOfConcurrentReads(), equalTo(1)); assertThat(status.numberOfConcurrentWrites(), equalTo(0)); assertThat(status.lastRequestedSeqNo(), equalTo(63L)); @@ -723,7 +723,7 @@ ShardFollowNodeTask createShardFollowTask(int maxBatchOperationCount, int maxCon readFailures = new LinkedList<>(); writeFailures = new LinkedList<>(); mappingUpdateFailures = new LinkedList<>(); - imdVersions = new LinkedList<>(); + mappingVersions = new LinkedList<>(); leaderGlobalCheckpoints = new LinkedList<>(); followerGlobalCheckpoints = new LinkedList<>(); maxSeqNos = new LinkedList<>(); @@ -738,9 +738,9 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer erro return; } - Long imdVersion = imdVersions.poll(); - if (imdVersion != null) { - handler.accept(imdVersion); + final Long mappingVersion = mappingVersions.poll(); + if (mappingVersion != null) { + handler.accept(mappingVersion); } } @@ -779,7 +779,7 @@ protected void innerSendShardChangesRequest(long from, int requestBatchSize, Con } final ShardChangesAction.Response response = new ShardChangesAction.Response( - imdVersions.poll(), + mappingVersions.poll(), leaderGlobalCheckpoints.poll(), maxSeqNos.poll(), operations); @@ -805,7 +805,7 @@ public void markAsFailed(Exception e) { }; } - private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long imdVersion, + private static ShardChangesAction.Response generateShardChangesResponse(long fromSeqNo, long toSeqNo, long mappingVersion, long leaderGlobalCheckPoint) { List ops = new ArrayList<>(); for (long seqNo = fromSeqNo; seqNo <= toSeqNo; seqNo++) { @@ -814,7 +814,7 @@ private static ShardChangesAction.Response generateShardChangesResponse(long fro ops.add(new Translog.Index("doc", id, seqNo, 0, source)); } return new ShardChangesAction.Response( - imdVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); + mappingVersion, leaderGlobalCheckPoint, leaderGlobalCheckPoint, ops.toArray(new Translog.Operation[0])); } void startTask(ShardFollowNodeTask task, long leaderGlobalCheckpoint, long followerGlobalCheckpoint) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index 32cc876125701..ec180943a3b5b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -209,7 +209,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co try { Translog.Operation[] ops = ShardChangesAction.getOperations(indexShard, seqNoStats.getGlobalCheckpoint(), from, maxOperationCount, params.getMaxBatchSizeInBytes()); - // Hard code index metadata version, this is ok, as mapping updates are not tested here. + // hard code mapping version; this is ok, as mapping updates are not tested here final ShardChangesAction.Response response = new ShardChangesAction.Response(1L, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), ops); handler.accept(response); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml index a38698a45be41..c64cbe7690f6f 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/ccr/stats.yml @@ -33,7 +33,7 @@ - gte: { bar.0.number_of_concurrent_reads: 0 } - match: { bar.0.number_of_concurrent_writes: 0 } - match: { bar.0.number_of_queued_writes: 0 } - - gte: { bar.0.index_metadata_version: 0 } + - gte: { bar.0.mapping_version: 0 } - gte: { bar.0.total_fetch_time_millis: 0 } - gte: { bar.0.number_of_successful_fetches: 0 } - gte: { bar.0.number_of_failed_fetches: 0 }