-
Notifications
You must be signed in to change notification settings - Fork 5
/
Sort.java
339 lines (306 loc) · 13.4 KB
/
Sort.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
package qp.operators;
import java.io.EOFException;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.PriorityQueue;
import java.util.UUID;
import java.util.Vector;
import qp.utils.Attribute;
import qp.utils.Batch;
import qp.utils.Tuple;
import qp.utils.TupleInRun;
/**
* Applies external sort on a given relation.
*/
public class Sort extends Operator {
// The base operator (i.e., the unsorted relation).
private final Operator base;
// A random UUID to avoid conflicts between different instances of Sort operators.
private final String uuid = UUID.randomUUID().toString();
// The number of buffer pages available.
private final int numOfBuffers;
// The index of the attribute to sort based on.
private final Vector<Integer> sortKeyIndices = new Vector<>();
// The number of tuples per batch.
private final int batchSize;
// The input stream from which we read the sorted result.
private ObjectInputStream sortedStream;
// Records whether we have reached out-of-stream for the sorted result.
private boolean eos = false;
/**
* Creates a new sort operator.
*
* @param base is the base operator.
* @param numOfBuffers is the number of buffers (in pages) available.
*/
public Sort(Operator base, Vector attrList, int numOfBuffers) {
super(OpType.SORT);
this.schema = base.schema;
this.base = base;
this.numOfBuffers = numOfBuffers;
this.batchSize = Batch.getPageSize() / schema.getTupleSize();
for (int i = 0; i < attrList.size(); i++) {
Attribute attribute = (Attribute) attrList.elementAt(i);
sortKeyIndices.add(schema.indexOf(attribute));
}
}
/**
* Opens the operator to prepare all the necessary resources. In the current implementation,
* the {@link Sort} operator is not iterator-based. We would materialize it by finishing the
* whole sorting when we open it.
*
* @return true if the operator is open successfully.
*/
@Override
public boolean open() {
// Makes sure the base operator can open successfully first.
if (!base.open()) {
return false;
}
// Phase 1: generate sorted runs using in-memory sorting algorithms.
int numOfRuns = generateSortedRuns();
// Phase 2: merge sorted runs together (could be in multiple passes).
return mergeRuns(numOfRuns, 1) == 1;
}
/**
* Generates sorted runs and stores them to disk. Currently, we utilize all buffer pages available
* and thus minimizes the number of runs generated.
*
* @return the number of sorted runs generated.
*/
private int generateSortedRuns() {
// Reads in the next page from base operator.
Batch inBatch = base.next();
int numOfRuns = 0;
while (inBatch != null) {
// Stores the tuples that will be in the current run.
Vector<Tuple> tuplesInRun = new Vector<>();
// Reads in as many tuples as possible (until either there is no more tuples or reaches buffer limit).
for (int i = 0; i < numOfBuffers && inBatch != null; i++) {
tuplesInRun.addAll(inBatch.getTuples());
// Do NOT read a new page if the current page is the last page of the current sorted run
// (because we do not have any more memory buffer).
if (i != numOfBuffers - 1) {
inBatch = base.next();
}
}
// Sorts the tuples using in-memory sorting algorithms.
tuplesInRun.sort(this::compareTuples);
// Stores the sorted result into disk (phase 1 is the 0th pass).
String fileName = getSortedRunFileName(0, numOfRuns);
try {
ObjectOutputStream stream = new ObjectOutputStream(new FileOutputStream(fileName));
// TODO: do we need to write each batch to a single file?
for (Tuple tuple : tuplesInRun) {
stream.writeObject(tuple);
}
stream.close();
} catch (IOException e) {
System.err.printf("Sort: unable to write sortedRun with ID=%d due to %s\n", numOfRuns, e.toString());
System.exit(1);
}
// Reads in another page and prepares for the next iteration.
inBatch = base.next();
numOfRuns++;
}
return numOfRuns;
}
/**
* Merges a given number of sorted runs in a manner similar to merge-sort. Here we use all available
* buffers to minimize the number of passes.
*
* @param numOfRuns is the number of sorted runs to be merged.
* @param passID is the ID of the current pass.
* @return the number of sorted runs after one round of merge.
*/
private int mergeRuns(int numOfRuns, int passID) {
// Exits if there is no more than 1 run (which means there is no need to merge anymore).
if (numOfRuns <= 1) {
try {
String fileName = getSortedRunFileName(passID - 1, numOfRuns - 1);
sortedStream = new ObjectInputStream(new FileInputStream(fileName));
} catch (IOException e) {
System.err.printf("Sort: cannot create sortedStream due to %s\n", e.toString());
}
return numOfRuns;
}
// Uses (numOfBuffers - 1) as input buffers, and the left one as output buffer.
int numOfOutputRuns = 0;
for (int startRunID = 0; startRunID < numOfRuns; startRunID = startRunID + numOfBuffers - 1) {
int endRunID = Math.min(startRunID + numOfBuffers - 1, numOfRuns);
try {
mergeRunsBetween(startRunID, endRunID, passID, numOfOutputRuns);
} catch (IOException e) {
System.err.printf("Sort: cannot mergeRuns on passID=%d for [%d, %d) due to %s\n", passID, startRunID, endRunID, e.toString());
System.exit(1);
} catch (ClassNotFoundException e) {
System.err.printf("Sort: class not found on passID=%d for [%d, %d) due to %s\n", passID, startRunID, endRunID, e.toString());
System.exit(1);
}
numOfOutputRuns++;
}
// Continues to the next round using a recursive call.
return mergeRuns(numOfOutputRuns, passID + 1);
}
/**
* Merges the sorted runs in the range of [startRunID, endRunID). Since we have a fix number of buffer
* pages, we assume endRunID - startRunID <= numOfBuffers - 1.
*
* @param startRunID is the sorted run ID of the lower bound (inclusive).
* @param endRunID is the sorted run ID of the upper bound (exclusive).
* @param passID is the ID of the current pass.
* @param outID is the sorted run ID of the output.
* @implNote we effectively implement a k-way merge sort here.
*/
private void mergeRunsBetween(int startRunID, int endRunID, int passID, int outID) throws IOException, ClassNotFoundException {
// Each input sorted run has 1 buffer page.
Batch[] inBatches = new Batch[endRunID - startRunID];
// Records whether a certain input sorted run has reached its end-of-stream.
boolean[] inEos = new boolean[endRunID - startRunID];
// Each input sorted run has one stream to read from.
ObjectInputStream[] inStreams = new ObjectInputStream[endRunID - startRunID];
for (int i = startRunID; i < endRunID; i++) {
String inputFileName = getSortedRunFileName(passID - 1, i);
ObjectInputStream inStream = new ObjectInputStream(new FileInputStream(inputFileName));
inStreams[i - startRunID] = inStream;
// Fills in the data from an input run.
Batch inBatch = new Batch(batchSize);
while (!inBatch.isFull()) {
try {
Tuple data = (Tuple) inStream.readObject();
inBatch.add(data);
} catch (EOFException eof) {
break;
}
}
inBatches[i - startRunID] = inBatch;
}
// A min-heap used later for k-way merge (representing the 1 page used for output buffer).
PriorityQueue<TupleInRun> outHeap = new PriorityQueue<>(batchSize, (o1, o2) -> compareTuples(o1.tuple, o2.tuple));
// The stream for output buffer.
String outputFileName = getSortedRunFileName(passID, outID);
ObjectOutputStream outStream = new ObjectOutputStream(new FileOutputStream(outputFileName));
// Inserts the 1st element (i.e., the smallest element) from each input buffer into the output heap.
for (int i = 0; i < endRunID - startRunID; i++) {
Batch inBatch = inBatches[i];
if (inBatch == null || inBatch.isEmpty()) {
inEos[i] = true;
continue;
}
Tuple current = inBatch.elementAt(0);
outHeap.add(new TupleInRun(current, i, 0));
}
// Continues until the output heap is empty (which means we have sorted and outputted everything).
while (!outHeap.isEmpty()) {
// Extracts and writes out the root of the output heap (which is the smallest element among all input runs).
TupleInRun outTuple = outHeap.poll();
outStream.writeObject(outTuple.tuple);
// Attempts to retrieve the next element from the same input buffer.
int nextBatchID = outTuple.runID;
int nextIndex = outTuple.tupleID + 1;
// Reads in the next page from the same input stream if that input buffer has been exhausted.
if (nextIndex == batchSize) {
Batch inBatch = new Batch(batchSize);
while (!inBatch.isFull()) {
try {
Tuple data = (Tuple) inStreams[nextBatchID].readObject();
inBatch.add(data);
} catch (EOFException eof) {
break;
}
}
inBatches[nextBatchID] = inBatch;
// Resets the index for that input buffer to be 0.
nextIndex = 0;
}
// Inserts the next element into the output heap if that input buffer is not empty.
Batch inBatch = inBatches[nextBatchID];
if (inBatch == null || inBatch.size() <= nextIndex) {
inEos[nextBatchID] = true;
continue;
}
Tuple nextTuple = inBatch.elementAt(nextIndex);
outHeap.add(new TupleInRun(nextTuple, nextBatchID, nextIndex));
}
// Closes the resources used.
for (ObjectInputStream inStream : inStreams) {
inStream.close();
}
outStream.close();
}
/**
* Compares two tuples based on the sort attribute.
*
* @param tuple1 is the first tuple.
* @param tuple2 is the second tuple.
* @return an integer indicating the comparision result, compatible with the {@link java.util.Comparator} interface.
*/
private int compareTuples(Tuple tuple1, Tuple tuple2) {
for (int sortKeyIndex: sortKeyIndices) {
int result = Tuple.compareTuples(tuple1, tuple2, sortKeyIndex);
if (result != 0) {
return result;
}
}
return 0;
}
/**
* Provides the file name of a generated sorted run based on its run ID.
*
* @param passID is the ID of the current pass.
* @param runID is the ID of the sorted run.
*/
private String getSortedRunFileName(int passID, int runID) {
return "Sort-run-" + uuid + "-" + passID + "-" + runID;
}
/**
* @return the next sorted page of tuples from the sorting result.
*/
@Override
public Batch next() {
if (eos) {
close();
return null;
}
Batch outBatch = new Batch(batchSize);
while (!outBatch.isFull()) {
try {
Tuple data = (Tuple) sortedStream.readObject();
outBatch.add(data);
} catch (ClassNotFoundException cnf) {
System.err.printf("Sort: class not found for reading from sortedStream due to %s\n", cnf.toString());
System.exit(1);
} catch (EOFException EOF) {
// Sends the incomplete page and close in the next call.
eos = true;
return outBatch;
} catch (IOException e) {
System.err.printf("Sort: error reading from sortedStream due to %s\n", e.toString());
System.exit(1);
}
}
return outBatch;
}
/**
* Closes the operator by gracefully closing the resources opened.
* TODO: delete generated files when closing.
*
* @return true if the operator is closed successfully.
*/
@Override
public boolean close() {
// Calls the close method in super-class for compatibility.
super.close();
// Closes the sorted stream previously opened.
try {
sortedStream.close();
} catch (IOException e) {
System.err.printf("Sort: unable to close sortedStream due to %s\n", e.toString());
return false;
}
return true;
}
}