-
Notifications
You must be signed in to change notification settings - Fork 0
/
SortedRecordRenderer.cpp
204 lines (181 loc) · 7.2 KB
/
SortedRecordRenderer.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#include "SortedRecordRenderer.h"
#include "utils.h"
#include <fstream>
#include <algorithm>
#include <filesystem>
#include <cmath>
#include <iostream>
#include <cstdio>
SortedRecordRenderer::SortedRecordRenderer (RowSize recordSize, u_int8_t pass, u_int16_t runNumber, bool removeDuplicates, byte * lastRow, bool materialize) :
_recordSize (recordSize), _produced (0), _removeDuplicates (removeDuplicates), _lastRow (lastRow)
{
TRACE (false);
if (materialize) {
materializer = new Materializer(pass, runNumber, this);
} else {
traceprintf ("Materializer is disabled for pass %d run %d\n", pass, runNumber);
materializer = nullptr;
}
} // SortedRecordRenderer::SortedRecordRenderer
SortedRecordRenderer::~SortedRecordRenderer ()
{
TRACE (false);
if (materializer != nullptr) {
delete materializer;
}
} // SortedRecordRenderer::~SortedRecordRenderer
string SortedRecordRenderer::run ()
{
TRACE (false);
byte * row = next();
while (row != nullptr) {
row = next();
}
#if defined(VERBOSEL2)
traceprintf ("%s: produced %llu rows\n", _outputFileName.c_str(), _produced);
traceprintf ("Run through renderer with output file %s\n", _outputFileName.c_str());
#endif
if (materializer != nullptr) {
string outputFileName = materializer->outputFileName;
delete materializer;
materializer = nullptr;
return outputFileName;
} else {
throw std::invalid_argument("SortedRecordRenderer::run can only be invoked when Materializer is enabled.");
}
} // ExternalRenderer::run
byte * SortedRecordRenderer::renderRow(std::function<byte *()> retrieveNext, TournamentTree *& tree, ExternalRun * longRun)
{
byte * rendered, * retrieved;
byte * output = nullptr;
bool canReturn = false;
while (true) {
canReturn = false;
byte * renderedFromTree = tree->peekRoot();
byte * renderedFromLongRun = longRun == nullptr ? nullptr : longRun->peek();
if (renderedFromTree == nullptr && renderedFromLongRun == nullptr) rendered = nullptr;
else if (renderedFromTree == nullptr) rendered = renderedFromLongRun;
else if (renderedFromLongRun == nullptr) rendered = renderedFromTree;
else if (memcmp(renderedFromTree, renderedFromLongRun, _recordSize) <= 0) rendered = renderedFromTree;
else rendered = renderedFromLongRun;
// if no more rows, jump out
if (rendered == nullptr) break;
if (
!_removeDuplicates || // not removing duplicates
_lastRow == nullptr || // last row is null
memcmp(_lastRow, rendered, _recordSize) != 0 // last row is different from the current row
) {
if (materializer != nullptr) {
// copy before retrieving next, as retrieving next could overwrite the current page in ExternalRenderer
output = materializer->addRowToOutputBuffer(rendered);
_lastRow = output;
} else {
_lastRow = rendered;
output = rendered;
}
canReturn = true;
} else { // continue to the next row until a distinct one is found
#if defined(VERBOSEL2)
traceprintf ("%s removed\n", rowToString(rendered, recordSize).c_str());
#endif
}
if (rendered == renderedFromTree) {
retrieved = retrieveNext();
if (retrieved == nullptr) {
tree->poll();
} else {
tree->pushAndPoll(retrieved);
}
} else {
longRun->next();
}
if (canReturn) break;
}
return output;
}
Materializer::Materializer(u_int8_t pass, u_int16_t runNumber, SortedRecordRenderer * renderer) :
renderer (renderer), deviceType (Metrics::getAvailableStorage())
{
TRACE (false);
auto pageSize = Metrics::getParams(deviceType).pageSize;
outputBuffer = new Buffer(pageSize / renderer->_recordSize, renderer->_recordSize);
outputFileName = getOutputFileName(pass, runNumber);
outputFile = ofstream(outputFileName, std::ios::binary);
Trace::finalOutputFileName = outputFileName;
#ifdef PRODUCTION
Trace::PrintTrace(OP_STATE, deviceType == STORAGE_SSD? SPILL_RUNS_SSD : SPILL_RUNS_HDD,
string("Spill sorted runs to the ") + getDeviceName(deviceType) + " device");
#endif
#if defined(VERBOSEL2)
traceprintf ("Materializer for run %d with output file %s initialized on device %d\n", runNumber, outputFileName.c_str(), deviceType);
#endif
} // Materializer::Materializer
Materializer::~Materializer ()
{
TRACE (false);
if (outputFile.is_open()) {
bool flush = flushOutputBuffer(outputBuffer->sizeFilled());
if (flush) outputFile.close();
}
delete outputBuffer;
} // Materializer::~Materializer
string Materializer::getOutputFileName (u_int8_t pass, u_int16_t runNumber)
{
string device = string("-device") + std::to_string(deviceType);
string dir = string(".") + SEPARATOR + string("spills") + SEPARATOR + string("pass") + std::to_string(pass);
string filename = string("run") + std::to_string(runNumber) + device;
return dir + SEPARATOR + filename;
} // SortedRecordRenderer::_getOutputFileName
byte * Materializer::addRowToOutputBuffer(byte * row)
{
TRACE (false);
if (row == nullptr) return nullptr;
byte * output = outputBuffer->copy(row);
while (output == nullptr) { // Output buffer is full
#if defined(VERBOSEL2)
traceprintf ("Run %d: output buffer flushed with %llu rows produced\n", _runNumber, _produced);
#endif
bool flush = flushOutputBuffer(outputBuffer->pageSize);
if (flush) output = outputBuffer->copy(row);
}
++ renderer->_produced;
return output;
} // SortedRecordRenderer::_addRowToOutputBuffer
bool Materializer::flushOutputBuffer(u_int32_t sizeFilled)
{
TRACE (false);
#if defined(VERBOSEL2)
traceprintf ("Run %d: output buffer flushed with %llu rows produced\n", _runNumber, _produced);
#endif
Assert (sizeFilled % renderer->_recordSize == 0, __FILE__, __LINE__);
// for (u_int32_t i = 0; i < sizeFilled; i += renderer->_recordSize) {
// byte * row = outputBuffer->data() + i;
// outputFile.write((char*) row, renderer->_recordSize); // Write to file before creating a new buffer
// outputFile.write("\n", 1);
// }
outputFile.write((char *) outputBuffer->data(), sizeFilled);
// Metrics: switch device if necessary
int deviceType = switchDevice(sizeFilled);
Metrics::write(deviceType, sizeFilled); // However, log the write to the new device
return true;
} // SortedRecordRenderer::_flushOutputBuffer
int Materializer::switchDevice (u_int32_t sizeFilled)
{
if (deviceType == STORAGE_SSD) { // switch device when the current one is SSD and is full
auto newDeviceType = Metrics::getAvailableStorage(sizeFilled);
if (newDeviceType != deviceType) { // switch to a new device
deviceType = newDeviceType;
string newFileName = outputFileName + string("-") + std::to_string(renderer->_produced) + string("-device") + std::to_string(deviceType);
std::rename(outputFileName.c_str(), newFileName.c_str());
outputFileName = newFileName;
delete outputBuffer;
outputBuffer = new Buffer(Metrics::getParams(deviceType).pageSize / renderer->_recordSize, renderer->_recordSize);
deviceType = newDeviceType;
#if defined(VERBOSEL1) || defined(VERBOSEL2)
traceprintf ("Switched to a new device %d at %llu, now output file is %s\n", deviceType, renderer->_produced, outputFileName.c_str());
#endif
return newDeviceType;
}
} // Wouldn't switch device when the current one is HDD and SSD becomes available for simplicity (max switch once)
return deviceType;
}