Skip to content

Commit

Permalink
Do not do rereading step in fread (#2558)
Browse files Browse the repository at this point in the history
Since the very early days, the design of fread was such that if any "type-bumps" happen in the middle of a file, then the corresponding column(s) will be marked as "require re-reading", and then at the end we would re-parse the entire file using the new column type. This approach has obvious drawbacks:
- the amount of time necessary to read the file almost doubles (and type bumps happen more often than you'd think);
- the logic behind type-bumping is very error-prone;
- it is impossible to read a stream-like input, where the data cannot be arbitrarily rewound;

The new approach for handling type-bumping is the following:
- When a type-bump occurs while reading a chunk, we enter the ordered section and temporarily suspend execution of other threads;
- While all other threads are paused, we:
  - "archive" the column which was type-bumped;
  - update the global types array with the new column parse types;
- After that, the parallel execution resumes from the start of the type-bumped chunk;
- (the process above may occur multiple times with different columns or different chunks);
- In the end, when the final frame is constructed, the columns that were comprised of multiple chunks are rbound together with automatic type promotion.

Closes #1843
Closes #1446
  • Loading branch information
st-pasha authored Aug 6, 2020
1 parent 9df9d7b commit ca2f1e0
Show file tree
Hide file tree
Showing 29 changed files with 759 additions and 450 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ clean::
rm -rf build
rm -rf dist
rm -rf datatable.egg-info
rm -rf htmlcov
rm -f *.so
rm -f src/datatable/lib/_datatable*.pyd
rm -f src/datatable/lib/_datatable*.so
Expand Down
26 changes: 3 additions & 23 deletions src/core/csv/fread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ std::unique_ptr<DataTable> FreadReader::read_all()
int nUserBumped = 0;
for (size_t i = 0; i < ncols; i++) {
auto& col = preframe.column(i);
col.reset_type_bumped();
if (col.is_dropped()) {
ndropped++;
continue;
Expand Down Expand Up @@ -100,37 +99,18 @@ std::unique_ptr<DataTable> FreadReader::read_all()
//****************************************************************************
// [6] Read the data
//****************************************************************************
bool firstTime = true;

auto typesPtr = preframe.get_ptypes();
dt::read::PT* types = typesPtr.data(); // This pointer is valid until `typesPtr` goes out of scope

read: // we'll return here to reread any columns with out-of-sample type exceptions
{
auto _ = logger_.section("[6] Read the data");
job->set_message(firstTime? "Reading data" : "Rereading data");
dt::progress::subtask subwork(*job, firstTime? WORK_READ : WORK_REREAD);
job->set_message("Reading data");
dt::progress::subtask subwork(*job, WORK_READ);
dt::read::FreadParallelReader scr(*this, types);
scr.read_all();
subwork.done();

if (firstTime) {
fo.t_data_read = fo.t_data_reread = wallclock();
} else {
fo.t_data_reread = wallclock();
}
size_t ncols_to_reread = preframe.n_columns_to_reread();
xassert((ncols_to_reread > 0) == reread_scheduled);
if (ncols_to_reread) {
fo.n_cols_reread += ncols_to_reread;
D() << dt::log::plural(ncols_to_reread, "column")
<< " need to be re-read because their types have changed";
preframe.prepare_for_rereading();
firstTime = false;
reread_scheduled = false;
goto read;
}

fo.t_data_read = wallclock();
fo.n_rows_read = preframe.nrows_written();
fo.n_cols_read = preframe.n_columns_in_output();
}
Expand Down
1 change: 1 addition & 0 deletions src/core/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,7 @@ void GenericReader::report_columns_to_python() {
auto& coli = preframe.column(i);
py::robj elem = newTypesList[i];
coli.set_rtype(elem.to_int64());
coli.outcol().set_stype(coli.get_stype());
if (newNamesList && coli.get_rtype() != RT::RDrop) {
XAssert(j < newNamesList.size());
elem = newNamesList[j++];
Expand Down
1 change: 0 additions & 1 deletion src/core/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class GenericReader
public:
static constexpr size_t WORK_PREPARE = 2;
static constexpr size_t WORK_READ = 100;
static constexpr size_t WORK_REREAD = 60;
static constexpr size_t WORK_DECODE_UTF16 = 50;
std::shared_ptr<dt::progress::work> job;
Buffer input_mbuf;
Expand Down
20 changes: 6 additions & 14 deletions src/core/csv/reader_fread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,8 @@ int64_t FreadReader::parse_single_line(dt::read::ParseContext& fctx)
fctx.skip_whitespace();

const char* fieldStart = tch;
auto ptype_iter = col.get_ptype_iterator(&fctx.quoteRule);
auto ptype_iter = dt::read::PtypeIterator(
col.get_ptype(), col.get_rtype(), &fctx.quoteRule);
while (true) {
// Try to parse using the regular field parser
tch = fieldStart;
Expand Down Expand Up @@ -532,6 +533,7 @@ int64_t FreadReader::parse_single_line(dt::read::ParseContext& fctx)
}
if (j < ncols && ptype_iter.has_incremented()) {
col.set_ptype(*ptype_iter);
col.outcol().set_stype(col.get_stype());
}
j++;

Expand Down Expand Up @@ -949,7 +951,6 @@ FreadObserver::FreadObserver(const dt::read::GenericReader& g_) : g(g_) {
t_column_types_detected = 0;
t_frame_allocated = 0;
t_data_read = 0;
t_data_reread = 0;
time_read_data = 0.0;
time_push_data = 0.0;
input_size = 0;
Expand All @@ -958,7 +959,6 @@ FreadObserver::FreadObserver(const dt::read::GenericReader& g_) : g(g_) {
n_lines_sampled = 0;
n_rows_allocated = 0;
n_cols_allocated = 0;
n_cols_reread = 0;
allocation_size = 0;
read_data_nthreads = 0;
}
Expand All @@ -974,8 +974,7 @@ void FreadObserver::report() {
t_parse_parameters_detected <= t_column_types_detected &&
t_column_types_detected <= t_frame_allocated &&
t_frame_allocated <= t_data_read &&
t_data_read <= t_data_reread &&
t_data_reread <= t_end &&
t_data_read <= t_end &&
read_data_nthreads > 0);
double total_time = std::max(t_end - t_start + g.t_open_input, 1e-6);
int total_minutes = static_cast<int>(total_time/60);
Expand All @@ -984,11 +983,10 @@ void FreadObserver::report() {
double types_time = t_column_types_detected - t_parse_parameters_detected;
double alloc_time = t_frame_allocated - t_column_types_detected;
double read_time = t_data_read - t_frame_allocated;
double reread_time = t_data_reread - t_data_read;
double makedt_time = t_end - t_data_reread;
double makedt_time = t_end - t_data_read;
double t_read = time_read_data.load() / read_data_nthreads;
double t_push = time_push_data.load() / read_data_nthreads;
double time_wait_data = read_time + reread_time - t_read - t_push;
double time_wait_data = read_time - t_read - t_push;
int p = total_time < 10 ? 5 :
total_time < 100 ? 6 :
total_time < 1000 ? 7 : 8;
Expand Down Expand Up @@ -1021,12 +1019,6 @@ void FreadObserver::report() {
g.d() << " + " << ff(p, 3, read_time) << "s ("
<< ff(2, 0, 100 * read_time / total_time) << "%)"
<< " reading data";
if (n_cols_reread) {
g.d() << " + " << ff(p, 3, reread_time) << "s ("
<< ff(2, 0, 100 * reread_time / total_time) << "%)"
<< " re-reading " << n_cols_reread
<< " columns due to out-of-sample type exceptions";
}
g.d() << " = " << ff(p, 3, t_read) << "s (" << ff(2, 0, 100 * t_read / total_time) << "%) reading into row-major buffers";
g.d() << " = " << ff(p, 3, t_push) << "s (" << ff(2, 0, 100 * t_push / total_time) << "%) saving into the output frame";
g.d() << " = " << ff(p, 3, time_wait_data) << "s (" << ff(2, 0, 100 * time_wait_data / total_time) << "%) waiting";
Expand Down
2 changes: 0 additions & 2 deletions src/core/csv/reader_fread.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ class FreadObserver {
double t_column_types_detected;
double t_frame_allocated;
double t_data_read;
double t_data_reread;
dt::atomic<double> time_read_data;
dt::atomic<double> time_push_data;
size_t n_rows_read;
Expand All @@ -62,7 +61,6 @@ class FreadObserver {
size_t n_lines_sampled;
size_t n_rows_allocated;
size_t n_cols_allocated;
size_t n_cols_reread;
size_t allocation_size;
size_t read_data_nthreads;
std::vector<std::string> messages;
Expand Down
35 changes: 35 additions & 0 deletions src/core/csv/reader_parsers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,41 @@ ParserLibrary::ParserLibrary() {



//------------------------------------------------------------------------------
// PtypeIterator
//------------------------------------------------------------------------------
namespace dt {
namespace read {


PtypeIterator::PtypeIterator(PT pt, RT rt, int8_t* qr_ptr)
: pqr(qr_ptr), rtype(rt), orig_ptype(pt), curr_ptype(pt) {}

PT PtypeIterator::operator*() const {
return curr_ptype;
}

RT PtypeIterator::get_rtype() const {
return rtype;
}

PtypeIterator& PtypeIterator::operator++() {
if (curr_ptype < PT::Str32) {
curr_ptype = static_cast<PT>(curr_ptype + 1);
} else {
*pqr = *pqr + 1;
}
return *this;
}

bool PtypeIterator::has_incremented() const {
return curr_ptype != orig_ptype;
}




}}
//------------------------------------------------------------------------------
// ParserIterator
//------------------------------------------------------------------------------
Expand Down
25 changes: 25 additions & 0 deletions src/core/csv/reader_parsers.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,31 @@ class ParserInfo {
//------------------------------------------------------------------------------
// Parser iterators
//------------------------------------------------------------------------------
namespace dt {
namespace read {


class PtypeIterator {
private:
int8_t* pqr;
RT rtype;
PT orig_ptype;
PT curr_ptype;
int64_t : 40;

public:
PtypeIterator(PT pt, RT rt, int8_t* qr_ptr);
PT operator*() const;
PtypeIterator& operator++();
bool has_incremented() const;
RT get_rtype() const;
};


}}


// unused?
class ParserIterator {
private:
int ipt;
Expand All @@ -184,6 +208,7 @@ class ParserIterator {
value_type operator*() const;
};

// unused?
class ParserIterable {
private:
const dt::read::PT ptype;
Expand Down
68 changes: 60 additions & 8 deletions src/core/parallel/api.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,16 @@ void parallel_for_ordered(size_t n_iterations,
function<std::unique_ptr<OrderedTask>()>);


/**
* This class facilitates execution of ordered for-loops. A user
* is expected to derive from this class, overriding methods
* `start(i)`, `order(i)` and `finish(i)`; and then call
* `dt::parallel_for_ordered()` supplying a factory function to
* create instances of the derived class.
*
* The class also has several methods for controlling the execution
* of the orderer loop. See their descriptions below.
*/
class OrderedTask : public ThreadTask {
enum State : size_t;
private:
Expand All @@ -149,26 +159,68 @@ class OrderedTask : public ThreadTask {
virtual void start(size_t i);
virtual void order(size_t i);
virtual void finish(size_t i);
size_t get_iteration() const noexcept;

void execute() override; // ThreadTask's API

size_t get_num_iterations() const;
// (This method may only be called from an ordered section).
// Change the number of iterations in the ordered job. The new
// number of iterations cannot be less than the number of
// iterations that were already ordered.
//
// If the new number of iterations is smaller than the original
// total count, then it is guaranteed that no task will be
// ordered or finished past `n`, although it is possible that
// some tasks will have started at an index `n` or above.
//
void set_num_iterations(size_t n);
size_t get_num_iterations() const;

// (This method may only be called from an ordered section).
// By the time this method returns, all tasks with the index
// less than the current will have completed their "finish"
// stage. Furthermore, no new tasks will start a "finish" stage
// until the end of the enclosing "ordered" step.
//
void wait_until_all_finalized();

// (This method may only be called from an ordered section).
// This method performs the following sequence of actions:
// - blocks new tasks from entering the "start" stage;
// - waits until there are no tasks executing either the
// "start" or "finish" stages;
// - executes the payload function `f()`;
// - resumes the multithreaded execution, making sure that all
// iterations beginning with the current will be re"start"ed.
// More specifically, the current iteration that was
// executing an ordered section, will not finish. Instead,
// this iteration will start again from the "start" step, then
// execute "ordered" again, and only after that it will
// "finish".
//
// Thus, this method creates a region of execution which behaves
// as-if the ordered loop was cancelled, then f() executed in a
// single-threaded environment, then the ordered loop restarted
// from the same iteration where it broke off.
//
// The programmer must take care not to create an infinite loop
// by repeatedly calling `super_ordered` on each execution of
// the same task.
//
void super_ordered(std::function<void()> f);

size_t get_iteration() const noexcept;
bool is_starting() const noexcept;
bool is_ordering() const noexcept;
bool is_finishing() const noexcept;

void execute() override; // ThreadTask's API

private:
friend class MultiThreaded_OrderedJob;
friend class SingleThreaded_OrderedJob;
void init_parent(OrderedJob* parent);
bool ready_to_start() const noexcept;
bool ready_to_order() const noexcept;
bool ready_to_finish() const noexcept;
bool is_starting() const noexcept;
bool is_ordering() const noexcept;
bool is_finishing() const noexcept;
void advance_state();
void cancel();
void start_iteration(size_t i);
};

Expand Down
Loading

0 comments on commit ca2f1e0

Please sign in to comment.