Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xread: an "extract+read" #1950

Open
st-pasha opened this issue Aug 5, 2019 · 1 comment
Open

xread: an "extract+read" #1950

st-pasha opened this issue Aug 5, 2019 · 1 comment
Labels
cust-goldmansachs design-doc Generic discussion / roadmap how some major new functionality can be implemented fread Issues related to parsing any input files via fread function

Comments

@st-pasha
Copy link
Contributor

st-pasha commented Aug 5, 2019

This is a proposal for implementing a new function xread(), which would be conceptually similar to fread(), but much lazier. In particular, xread() would parse only the first n_sample_lines=100 lines of the file, detecting the general information such as parse settings, the number columns, their names and types. After that, xread() returns a "lazy frame" object, which can be used with the standard [i,j] notation:

  • [:n, :] returns just the first n rows of the dataset (equivalent to max_nrows parameter);
  • [1000:2000, :] returns rows from 1000 to 2000. Generally, we should allow the user to request consecutive ranges on the same lazy frame. This will be equivalent to "chunked reading", which is a popular request;
  • [-100:, :] returns the last 100 rows. For a "file object" sources, this would require that the file is read in its entirety. For files on disk (or in memory), we could try to parse from the end of the file.
  • [:, :5] return only the first 5 columns.
  • [:, ["A", "B", "C"]] return columns named A, B and C.
  • [:, f + {"wage_per_hour": f.salary/f.hours}] return all columns + an additional column containing the salary divided by hours.
  • [f.status != '', :] return only the rows where field status is not empty.
  • [dt.random.randu() < 0.2, :] randomly sample 20% of the rows.
  • ...

These are just some of the examples of what could be possible. Obviously, the i and j selectors can be combined into a single [i, j] selector too. It should be even possible to add join operations and groupbys into the mix (provided that we use single-pass hash-based grouping).

@st-pasha st-pasha added fread Issues related to parsing any input files via fread function design-doc Generic discussion / roadmap how some major new functionality can be implemented labels Aug 5, 2019
@st-pasha st-pasha self-assigned this Aug 5, 2019
@st-pasha
Copy link
Contributor Author

st-pasha commented Aug 8, 2019

Implementation note

Whereas the standard DT[i,j,by] selector takes advantage of the fact that the data is readily present in memory and allows random access and multiple passes, this is not so with the "lazy frame" as produced by xread. The key property of this lazy frame is that it is essentially single-pass (with some buffering), and therefore the algorithm for applying the [i,j,by] selector must be rethought accordingly.

Another crucial property of the xread-frame is that the principal driver of the materialization process is the ordered read loop. Thus, the process can be visualized as follows:

+--------+           +---------+
|  raw   |  acquire  |  main   |   filter    +--------+
|  data  |<----------| ordered |------------>| Result |
| source |   data    |  loop   |  transform  +--------+
+--------+           +---------+

The main ordered loop thus pulls the data from the "raw data source", puts it into an internal buffer, and then pushes that data through "filter/transform" data, finally storing into the resulting frame.

Acquire data

In this step, the data is retrieved from some "raw data source", stored in the intermediate read buffer, and is passed on to the main ordered loop. Crucially, the main loop is the driver here: it commands how much of the data to retrieve.

  • Memory-mapped files are the simplest case, the data can be essentially accessed randomly, and "acquiring" data is as simple as passing a pointer;
  • Generic file objects will receive the data in chunks and store in intermediate buffer, until the main ordered loop signals that that data can be released. This also includes various streaming applications: data loaded from a URL, from an input pipe, from a socket, from an un-archiver algorithm, from an encoding decoder, from de-encryptor, etc. It would be beneficial to have a dedicated thread performing this reading, because this I/O will likely be the bottleneck of the operation.
  • In some cases the underlying data supports random access, though non-trivial amount of processing may still be necessary. Possible use cases include: decoding from certain encodings, unarchiving parallel-gzip file, running a line pre-processor. In such cases we could just handle data retrieval in each worker thread, same as in simple memory-map case.

Multiple data sources can be effectively chained, for example a file that is loaded from a URL may need to be decompressed, and then perhaps even decoded from a legacy encoding into UTF-8.

After acquiring each data chunk, we parse it as CSV into a table of values which is stored internally as a buffer. This buffer must then pass through the filter/transform stage.

Filter/transform data

The buffer that was obtained when parsing a CSV chunk is instantiated into a virtual Frame. This Frame will have virtual columns pointing directly to the underlying data buffer. At this point we:

  1. Evaluate the filter expression i, constructing a boolean mask of those rows that will be saved into the output, and computing the count of those rows;
  2. If there are any string columns in the output (or any other variable-width types), pre-materialize them so that we know the exact size that this column will take in the output;
  3. In the ordered clause, advance the pointers to each column according to the number of rows selected and the sizes of each string rows;
  4. In the post-ordered step, copy the data from each intermediate buffer into the resulting frame.

Special notes about handling slice selectors A:B:C in i:

  • first A rows are simply ignored;
  • rows up to B are included into the output. Once we reach row B, we suspend all reading abd exit the parallel region (though execute any pending post-order steps). The state of the reading process must be preserved at this point (and stored with the produced lazy frame object). If there was a dedicated thread reading the input, that thread must also be suspended and its state (together with any internal buffers) saved;
  • if the user later on queries the same xread-frame for another slice starting at B, then the frame should restore the parallel team from the same point where it stopped the previous time.
  • negative step C can be handled by reversing the slice, and then applying a [::-1] slice on top of the Result frame.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cust-goldmansachs design-doc Generic discussion / roadmap how some major new functionality can be implemented fread Issues related to parsing any input files via fread function
Projects
None yet
Development

No branches or pull requests

2 participants