Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-13554: [C++] Remove deprecated Scanner::Scan #11991

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
fbd7ef0
ARROW-13554: First pass at removing sync scanner
westonpace Dec 18, 2021
b3ac3f2
ARROW-13554: Adding missing exports for windows
westonpace Dec 18, 2021
956507b
ARROW-13554: Added a warning if the user is trying to avoid the async…
westonpace Jan 5, 2022
832e6a6
ARROW-13554: Fixed a compile error introduced (I think) by rebase
westonpace Jan 5, 2022
e016fe0
ARROW-13554: Testing the removal of use_async from cglib
westonpace Jan 5, 2022
9958c07
ARROW-13554: Fixed a logic error in the orc async conversion
westonpace Jan 6, 2022
8b6a6e8
ARROW-13554: Lint
westonpace Jan 6, 2022
ea5a2a2
ARROW-13554: Missed an ARROW_DS_EXPORT for Windows
westonpace Jan 6, 2022
90248e4
ARROW-13554: The JNI dataset interface does not seem to handle offset…
westonpace Jan 6, 2022
1a01c46
ARROW-13554: Addressing PR review comments. Added back in FromReader…
westonpace Jan 6, 2022
f2327cd
ARROW-13554: Fixed up offset handling logic in JNI so it only does th…
westonpace Jan 6, 2022
50a1b69
ARROW-13554: Reverting a few whitespace formatting changes I missed i…
westonpace Jan 6, 2022
e845f48
ARROW-13554: Renamed the ProjectionDescr::MakeFromXyz methods to Proj…
westonpace Jan 6, 2022
d516521
ARROW-13554: Missed a rename of ProjectionDescr methods
westonpace Jan 6, 2022
7ac93dd
ARROW-13554: Added deprecation warnings in R for use_async instead of…
westonpace Jan 6, 2022
3629ed1
ARROW-13554: Minor fix in projectiondescr methods. Changed python to…
westonpace Jan 7, 2022
666570b
ARROW-13554: Removed a use_async reference in the python docs.
westonpace Jan 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions c_glib/arrow-dataset-glib/scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,7 @@ typedef struct GADatasetScannerBuilderPrivate_ {
} GADatasetScannerBuilderPrivate;

enum {
PROP_SCANNER_BUILDER = 1,
PROP_USE_ASYNC,
PROP_SCANNER_BUILDER = 1
};

G_DEFINE_TYPE_WITH_PRIVATE(GADatasetScannerBuilder,
Expand Down Expand Up @@ -173,11 +172,6 @@ gadataset_scanner_builder_set_property(GObject *object,
*static_cast<std::shared_ptr<arrow::dataset::ScannerBuilder> *>(
g_value_get_pointer(value));
break;
case PROP_USE_ASYNC:
garrow::check(nullptr,
priv->scanner_builder->UseAsync(g_value_get_boolean(value)),
"[scanner-builder][use-async][set]");
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
break;
Expand Down Expand Up @@ -206,21 +200,6 @@ gadataset_scanner_builder_class_init(GADatasetScannerBuilderClass *klass)
static_cast<GParamFlags>(G_PARAM_WRITABLE |
G_PARAM_CONSTRUCT_ONLY));
g_object_class_install_property(gobject_class, PROP_SCANNER_BUILDER, spec);

arrow::dataset::ScanOptions default_options;
/**
* GADatasetScannerBuilder:use-async:
*
* Whether or not async mode is used.
*
* Since: 6.0.0
*/
spec = g_param_spec_boolean("use-async",
westonpace marked this conversation as resolved.
Show resolved Hide resolved
"Use async",
"Whether or not async mode is used",
default_options.use_async,
static_cast<GParamFlags>(G_PARAM_WRITABLE));
g_object_class_install_property(gobject_class, PROP_USE_ASYNC, spec);
}

/**
Expand Down
1 change: 0 additions & 1 deletion c_glib/test/dataset/test-file-system-dataset.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ def test_read_write
count: build_int32_array([1, 10, 2, 3]))
table_reader = Arrow::TableBatchReader.new(table)
scanner_builder = ArrowDataset::ScannerBuilder.new(table_reader)
scanner_builder.use_async = true
scanner = scanner_builder.finish
options = ArrowDataset::FileSystemDatasetWriteOptions.new
options.file_write_options = @format.default_write_options
Expand Down
5 changes: 0 additions & 5 deletions c_glib/test/dataset/test-scanner-builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,4 @@ def test_filter
scanner.to_table)
end

def test_use_async
@builder.use_async = true
scanner = @builder.finish
assert_equal(@table, scanner.to_table)
end
end
23 changes: 0 additions & 23 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,29 +80,6 @@ InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches,
physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema();
}

Result<ScanTaskIterator> InMemoryFragment::Scan(std::shared_ptr<ScanOptions> options) {
// Make an explicit copy of record_batches_ to ensure Scan can be called
// multiple times.
auto batches_it = MakeVectorIterator(record_batches_);

auto batch_size = options->batch_size;
// RecordBatch -> ScanTask
auto self = shared_from_this();
auto fn = [=](std::shared_ptr<RecordBatch> batch) -> std::shared_ptr<ScanTask> {
RecordBatchVector batches;

auto n_batches = bit_util::CeilDiv(batch->num_rows(), batch_size);
for (int i = 0; i < n_batches; i++) {
batches.push_back(batch->Slice(batch_size * i, batch_size));
}

return ::arrow::internal::make_unique<InMemoryScanTask>(std::move(batches),
std::move(options), self);
};

return MakeMapIterator(fn, std::move(batches_it));
}

Result<RecordBatchGenerator> InMemoryFragment::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) {
struct State {
Expand Down
13 changes: 0 additions & 13 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,6 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
/// The schema is cached after being read once, or may be specified at construction.
Result<std::shared_ptr<Schema>> ReadPhysicalSchema();

/// \brief Scan returns an iterator of ScanTasks, each of which yields
/// RecordBatches from this Fragment.
///
/// Note that batches yielded using this method will not be filtered and may not align
/// with the Fragment's schema. In particular, note that columns referenced by the
/// filter may be present in yielded batches even if they are not projected (so that
/// they are available when a filter is applied). Additionally, explicitly projected
/// columns may be absent if they were not present in this fragment.
///
/// To receive a record batch stream which is fully filtered and projected, use Scanner.
virtual Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) = 0;

/// An asynchronous version of Scan
virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) = 0;
Expand Down Expand Up @@ -133,7 +121,6 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
explicit InMemoryFragment(RecordBatchVector record_batches,
compute::Expression = compute::literal(true));

Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;
Future<util::optional<int64_t>> CountRows(
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest {
}
options_ = std::make_shared<ScanOptions>();
options_->dataset_schema = schema;
ASSERT_OK(SetProjection(options_.get(), schema->field_names()));
ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(*schema));
SetProjection(options_.get(), std::move(projection));
ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema));
ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments());
AssertFragmentsAreFromPath(std::move(fragment_it), paths);
Expand Down
67 changes: 3 additions & 64 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
#include <unordered_map>
#include <vector>

#include "arrow/compute/api_scalar.h"
#include "arrow/compute/exec/forest_internal.h"
#include "arrow/compute/exec/subtree_internal.h"
#include "arrow/dataset/dataset_internal.h"
#include "arrow/dataset/dataset_writer.h"
#include "arrow/dataset/scanner.h"
#include "arrow/dataset/scanner_internal.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_util.h"
#include "arrow/io/compressed.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/compression.h"
#include "arrow/util/iterator.h"
#include "arrow/util/macros.h"
Expand All @@ -45,6 +46,7 @@

namespace arrow {

using internal::checked_cast;
using internal::checked_pointer_cast;

namespace dataset {
Expand Down Expand Up @@ -111,66 +113,10 @@ Result<std::shared_ptr<FileFragment>> FileFormat::MakeFragment(
std::move(partition_expression), std::move(physical_schema)));
}

// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient.
// Each of the formats should provide their own efficient implementation. However, this
// is a reasonable starting point or implementation for a dummy/mock format.
Result<RecordBatchGenerator> FileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) const {
ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file));
struct State {
State(std::shared_ptr<ScanOptions> scan_options, ScanTaskIterator scan_task_it)
: scan_options(std::move(scan_options)),
scan_task_it(std::move(scan_task_it)),
current_rb_it(),
finished(false) {}

std::shared_ptr<ScanOptions> scan_options;
ScanTaskIterator scan_task_it;
RecordBatchIterator current_rb_it;
bool finished;
};
struct Generator {
Future<std::shared_ptr<RecordBatch>> operator()() {
while (!state->finished) {
if (!state->current_rb_it) {
RETURN_NOT_OK(PumpScanTask());
if (state->finished) {
return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
}
}
ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next());
if (IsIterationEnd(next_batch)) {
state->current_rb_it = RecordBatchIterator();
} else {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(next_batch);
}
}
return AsyncGeneratorEnd<std::shared_ptr<RecordBatch>>();
}
Status PumpScanTask() {
ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next());
if (IsIterationEnd(next_task)) {
state->finished = true;
} else {
ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute());
}
return Status::OK();
}
std::shared_ptr<State> state;
};
return Generator{std::make_shared<State>(scan_options, std::move(scan_task_it))};
}

Result<std::shared_ptr<Schema>> FileFragment::ReadPhysicalSchemaImpl() {
return format_->Inspect(source_);
}

Result<ScanTaskIterator> FileFragment::Scan(std::shared_ptr<ScanOptions> options) {
auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
return format_->ScanFile(options, self);
}

Result<RecordBatchGenerator> FileFragment::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) {
auto self = std::dynamic_pointer_cast<FileFragment>(shared_from_this());
Expand Down Expand Up @@ -391,13 +337,6 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
if (!scanner->options()->use_async) {
return Status::Invalid(
"A dataset write operation was invoked on a scanner that was configured for "
"synchronous scanning. Dataset writing requires a scanner configured for "
"asynchronous scanning. Please recreate the scanner with the use_async or "
"UseAsync option set to true");
}
const io::IOContext& io_context = scanner->options()->io_context;
std::shared_ptr<compute::ExecContext> exec_context =
std::make_shared<compute::ExecContext>(io_context.pool(),
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,15 +147,10 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
/// \brief Return the schema of the file if possible.
virtual Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const = 0;

/// \brief Open a FileFragment for scanning.
/// May populate lazy properties of the FileFragment.
virtual Result<ScanTaskIterator> ScanFile(
virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const = 0;

virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& file) const;
virtual Future<util::optional<int64_t>> CountRows(
const std::shared_ptr<FileFragment>& file, compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options);
Expand Down Expand Up @@ -186,7 +181,6 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this<FileForma
/// \brief A Fragment that is stored in a file with a known format
class ARROW_DS_EXPORT FileFragment : public Fragment {
public:
Result<ScanTaskIterator> Scan(std::shared_ptr<ScanOptions> options) override;
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;
Future<util::optional<int64_t>> CountRows(
Expand Down
45 changes: 0 additions & 45 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,42 +198,6 @@ static RecordBatchGenerator GeneratorFromReader(
return MakeFromFuture(std::move(gen_fut));
}

/// \brief A ScanTask backed by an Csv file.
class CsvScanTask : public ScanTask {
public:
CsvScanTask(std::shared_ptr<const CsvFileFormat> format,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<FileFragment> fragment)
: ScanTask(std::move(options), fragment),
format_(std::move(format)),
source_(fragment->source()) {}

Result<RecordBatchIterator> Execute() override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(),
::arrow::internal::GetCpuThreadPool());
auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size);
return MakeGeneratorIterator(std::move(reader_gen));
}

Future<RecordBatchVector> SafeExecute(Executor* executor) override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor);
auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size);
return CollectAsyncGenerator(reader_gen);
}

Future<> SafeVisit(
Executor* executor,
std::function<Status(std::shared_ptr<RecordBatch>)> visitor) override {
auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor);
auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size);
return VisitAsyncGenerator(reader_gen, visitor);
}

private:
std::shared_ptr<const CsvFileFormat> format_;
FileSource source_;
};

bool CsvFileFormat::Equals(const FileFormat& format) const {
if (type_name() != format.type_name()) return false;

Expand All @@ -260,15 +224,6 @@ Result<std::shared_ptr<Schema>> CsvFileFormat::Inspect(const FileSource& source)
return reader->schema();
}

Result<ScanTaskIterator> CsvFileFormat::ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const {
auto this_ = checked_pointer_cast<const CsvFileFormat>(shared_from_this());
auto task = std::make_shared<CsvScanTask>(std::move(this_), options, fragment);

return MakeVectorIterator<std::shared_ptr<ScanTask>>({std::move(task)});
}

Result<RecordBatchGenerator> CsvFileFormat::ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) const {
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
/// \brief Return the schema of the file if possible.
Result<std::shared_ptr<Schema>> Inspect(const FileSource& source) const override;

/// \brief Open a file for scanning
Result<ScanTaskIterator> ScanFile(
const std::shared_ptr<ScanOptions>& options,
const std::shared_ptr<FileFragment>& fragment) const override;

Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& scan_options,
const std::shared_ptr<FileFragment>& file) const override;
Expand Down
10 changes: 2 additions & 8 deletions cpp/src/arrow/dataset/file_csv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,9 @@ class TestCsvFileFormat : public FileFormatFixtureMixin<CsvFormatHelper>,
return ::arrow::internal::make_unique<FileSource>(info, fs, GetCompression());
}

RecordBatchIterator Batches(ScanTaskIterator scan_task_it) {
return MakeFlattenIterator(MakeMaybeMapIterator(
[](std::shared_ptr<ScanTask> scan_task) { return scan_task->Execute(); },
std::move(scan_task_it)));
}

RecordBatchIterator Batches(Fragment* fragment) {
EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_));
return Batches(std::move(scan_task_it));
EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_));
return MakeGeneratorIterator(batch_gen);
}
};

Expand Down
Loading