-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
TestTranslog.java
371 lines (326 loc) · 15.2 KB
/
TestTranslog.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.index.translog;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.opensearch.index.translog.Translog.CHECKPOINT_FILE_NAME;
import static org.opensearch.index.translog.Translog.TRANSLOG_FILE_SUFFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
/**
* Helpers for testing translog.
*/
public class TestTranslog {
private static final Pattern TRANSLOG_FILE_PATTERN = Pattern.compile("^translog-(\\d+)\\.(tlog|ckp)$");
/**
* Corrupts random translog file (translog-N.tlog or translog-N.ckp or translog.ckp) from the given translog directory, ignoring
* translogs and checkpoints with generations below the generation recorded in the latest index commit found in translogDir/../index/,
* or writes a corrupted translog-N.ckp file as if from a crash while rolling a generation.
*
* <p>
* See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied.
*/
public static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir) throws IOException {
corruptRandomTranslogFile(logger, random, translogDir, Translog.readCheckpoint(translogDir).minTranslogGeneration);
}
/**
* Corrupts random translog file (translog-N.tlog or translog-N.ckp or translog.ckp) from the given translog directory, or writes a
* corrupted translog-N.ckp file as if from a crash while rolling a generation.
* <p>
* See {@link TestTranslog#corruptFile(Logger, Random, Path, boolean)} for details of the corruption applied.
*
* @param minGeneration the minimum generation (N) to corrupt. Translogs and checkpoints with lower generation numbers are ignored.
*/
static void corruptRandomTranslogFile(Logger logger, Random random, Path translogDir, long minGeneration) throws IOException {
logger.info("--> corruptRandomTranslogFile: translogDir [{}], minUsedTranslogGen [{}]", translogDir, minGeneration);
Path unnecessaryCheckpointCopyPath = null;
try {
final Path checkpointPath = translogDir.resolve(CHECKPOINT_FILE_NAME);
final Checkpoint checkpoint = Checkpoint.read(checkpointPath);
unnecessaryCheckpointCopyPath = translogDir.resolve(Translog.getCommitCheckpointFileName(checkpoint.generation));
if (LuceneTestCase.rarely(random) && Files.exists(unnecessaryCheckpointCopyPath) == false) {
// if we crashed while rolling a generation then we might have copied `translog.ckp` to its numbered generation file but
// have not yet written a new `translog.ckp`. During recovery we must also verify that this file is intact, so it's ok to
// corrupt this file too (either by writing the wrong information, correctly formatted, or by properly corrupting it)
final Checkpoint checkpointCopy;
if (LuceneTestCase.usually(random)) {
checkpointCopy = checkpoint;
} else {
long newTranslogGeneration = checkpoint.generation + random.nextInt(2);
long newMinTranslogGeneration = Math.min(newTranslogGeneration, checkpoint.minTranslogGeneration + random.nextInt(2));
long newMaxSeqNo = checkpoint.maxSeqNo + random.nextInt(2);
long newMinSeqNo = Math.min(newMaxSeqNo, checkpoint.minSeqNo + random.nextInt(2));
long newTrimmedAboveSeqNo = Math.min(newMaxSeqNo, checkpoint.trimmedAboveSeqNo + random.nextInt(2));
checkpointCopy = new Checkpoint(
checkpoint.offset + random.nextInt(2),
checkpoint.numOps + random.nextInt(2),
newTranslogGeneration,
newMinSeqNo,
newMaxSeqNo,
checkpoint.globalCheckpoint + random.nextInt(2),
newMinTranslogGeneration,
newTrimmedAboveSeqNo
);
}
Checkpoint.write(
FileChannel::open,
unnecessaryCheckpointCopyPath,
checkpointCopy,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW
);
if (checkpointCopy.equals(checkpoint) == false) {
logger.info(
"corruptRandomTranslogFile: created [{}] containing [{}] instead of [{}]",
unnecessaryCheckpointCopyPath,
checkpointCopy,
checkpoint
);
return;
} // else checkpoint copy has the correct content so it's now a candidate for the usual kinds of corruption
}
} catch (TranslogCorruptedException e) {
// missing or corrupt checkpoint already, find something else to break...
}
Set<Path> candidates = new TreeSet<>(); // TreeSet makes sure iteration order is deterministic
try (DirectoryStream<Path> stream = Files.newDirectoryStream(translogDir)) {
for (Path item : stream) {
if (Files.isRegularFile(item) && Files.size(item) > 0) {
final String filename = item.getFileName().toString();
final Matcher matcher = TRANSLOG_FILE_PATTERN.matcher(filename);
if (filename.equals("translog.ckp") || (matcher.matches() && Long.parseLong(matcher.group(1)) >= minGeneration)) {
candidates.add(item);
}
}
}
}
assertThat("no corruption candidates found in " + translogDir, candidates, is(not(empty())));
final Path fileToCorrupt = RandomPicks.randomFrom(random, candidates);
// deleting the unnecessary checkpoint file doesn't count as a corruption
final boolean maybeDelete = fileToCorrupt.equals(unnecessaryCheckpointCopyPath) == false;
corruptFile(logger, random, fileToCorrupt, maybeDelete);
}
/**
* Corrupt an (existing and nonempty) file by replacing any byte in the file with a random (different) byte, or by truncating the file
* to a random (strictly shorter) length, or by deleting the file.
*/
static void corruptFile(Logger logger, Random random, Path fileToCorrupt, boolean maybeDelete) throws IOException {
assertThat(fileToCorrupt + " should be a regular file", Files.isRegularFile(fileToCorrupt));
final long fileSize = Files.size(fileToCorrupt);
assertThat(fileToCorrupt + " should not be an empty file", fileSize, greaterThan(0L));
if (maybeDelete && random.nextBoolean() && random.nextBoolean()) {
logger.info("corruptFile: deleting file {}", fileToCorrupt);
IOUtils.rm(fileToCorrupt);
return;
}
try (FileChannel fileChannel = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
final long corruptPosition = RandomNumbers.randomLongBetween(random, 0, fileSize - 1);
if (random.nextBoolean()) {
do {
// read
fileChannel.position(corruptPosition);
assertThat(fileChannel.position(), equalTo(corruptPosition));
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
fileChannel.read(bb);
bb.flip();
// corrupt
byte oldValue = bb.get(0);
byte newValue;
do {
newValue = (byte) random.nextInt(0x100);
} while (newValue == oldValue);
bb.put(0, newValue);
// rewrite
fileChannel.position(corruptPosition);
fileChannel.write(bb);
logger.info(
"corruptFile: corrupting file {} at position {} turning 0x{} into 0x{}",
fileToCorrupt,
corruptPosition,
Integer.toHexString(oldValue & 0xff),
Integer.toHexString(newValue & 0xff)
);
} while (isTranslogHeaderVersionFlipped(fileToCorrupt, fileChannel));
} else {
logger.info("corruptFile: truncating file {} from length {} to length {}", fileToCorrupt, fileSize, corruptPosition);
fileChannel.truncate(corruptPosition);
}
}
}
/**
* Returns the primary term associated with the current translog writer of the given translog.
*/
public static long getCurrentTerm(Translog translog) {
return translog.getCurrent().getPrimaryTerm();
}
public static List<Translog.Operation> drainSnapshot(Translog.Snapshot snapshot, boolean sortBySeqNo) throws IOException {
final List<Translog.Operation> ops = new ArrayList<>(snapshot.totalOperations());
Translog.Operation op;
while ((op = snapshot.next()) != null) {
ops.add(op);
}
if (sortBySeqNo) {
ops.sort(Comparator.comparing(Translog.Operation::seqNo));
}
return ops;
}
public static Translog.Snapshot newSnapshotFromOperations(List<Translog.Operation> operations) {
final Iterator<Translog.Operation> iterator = operations.iterator();
return new Translog.Snapshot() {
@Override
public int totalOperations() {
return operations.size();
}
@Override
public Translog.Operation next() {
if (iterator.hasNext()) {
return iterator.next();
} else {
return null;
}
}
@Override
public void close() {
}
};
}
/**
* An old translog header does not have a checksum. If we flip the header version of an empty translog from 3 to 2,
* then we won't detect that corruption, and the translog will be considered clean as before.
*/
static boolean isTranslogHeaderVersionFlipped(Path corruptedFile, FileChannel channel) throws IOException {
if (corruptedFile.toString().endsWith(TRANSLOG_FILE_SUFFIX) == false) {
return false;
}
channel.position(0);
final InputStreamStreamInput in = new InputStreamStreamInput(Channels.newInputStream(channel), channel.size());
try {
final int version = TranslogHeader.readHeaderVersion(corruptedFile, channel, in);
return version == TranslogHeader.VERSION_CHECKPOINTS;
} catch (IllegalStateException | TranslogCorruptedException | IOException e) {
return false;
}
}
static class LocationOperation implements Comparable<LocationOperation> {
final Translog.Operation operation;
final Translog.Location location;
LocationOperation(Translog.Operation operation, Translog.Location location) {
this.operation = operation;
this.location = location;
}
@Override
public int compareTo(LocationOperation o) {
return location.compareTo(o.location);
}
}
static class FailSwitch {
private volatile int failRate;
private volatile boolean onceFailedFailAlways = false;
public boolean fail() {
final int rnd = OpenSearchTestCase.randomIntBetween(1, 100);
boolean fail = rnd <= failRate;
if (fail && onceFailedFailAlways) {
failAlways();
}
return fail;
}
public void failNever() {
failRate = 0;
}
public void failAlways() {
failRate = 100;
}
public void failRandomly() {
failRate = OpenSearchTestCase.randomIntBetween(1, 100);
}
public void failRate(int rate) {
failRate = rate;
}
public void onceFailedFailAlways() {
onceFailedFailAlways = true;
}
}
static class SortedSnapshot implements Translog.Snapshot {
private final Translog.Snapshot snapshot;
private List<Translog.Operation> operations = null;
SortedSnapshot(Translog.Snapshot snapshot) {
this.snapshot = snapshot;
}
@Override
public int totalOperations() {
return snapshot.totalOperations();
}
@Override
public Translog.Operation next() throws IOException {
if (operations == null) {
operations = new ArrayList<>();
Translog.Operation op;
while ((op = snapshot.next()) != null) {
operations.add(op);
}
operations.sort(Comparator.comparing(Translog.Operation::seqNo));
}
if (operations.isEmpty()) {
return null;
}
return operations.remove(0);
}
@Override
public void close() throws IOException {
snapshot.close();
}
}
}