diff --git a/c_glib/arrow-dataset-glib/scanner.cpp b/c_glib/arrow-dataset-glib/scanner.cpp index 51542bb0a6f4d..07a5d4aeadae9 100644 --- a/c_glib/arrow-dataset-glib/scanner.cpp +++ b/c_glib/arrow-dataset-glib/scanner.cpp @@ -138,8 +138,7 @@ typedef struct GADatasetScannerBuilderPrivate_ { } GADatasetScannerBuilderPrivate; enum { - PROP_SCANNER_BUILDER = 1, - PROP_USE_ASYNC, + PROP_SCANNER_BUILDER = 1 }; G_DEFINE_TYPE_WITH_PRIVATE(GADatasetScannerBuilder, @@ -173,11 +172,6 @@ gadataset_scanner_builder_set_property(GObject *object, *static_cast *>( g_value_get_pointer(value)); break; - case PROP_USE_ASYNC: - garrow::check(nullptr, - priv->scanner_builder->UseAsync(g_value_get_boolean(value)), - "[scanner-builder][use-async][set]"); - break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec); break; @@ -206,21 +200,6 @@ gadataset_scanner_builder_class_init(GADatasetScannerBuilderClass *klass) static_cast(G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY)); g_object_class_install_property(gobject_class, PROP_SCANNER_BUILDER, spec); - - arrow::dataset::ScanOptions default_options; - /** - * GADatasetScannerBuilder:use-async: - * - * Whether or not async mode is used. - * - * Since: 6.0.0 - */ - spec = g_param_spec_boolean("use-async", - "Use async", - "Whether or not async mode is used", - default_options.use_async, - static_cast(G_PARAM_WRITABLE)); - g_object_class_install_property(gobject_class, PROP_USE_ASYNC, spec); } /** diff --git a/c_glib/test/dataset/test-file-system-dataset.rb b/c_glib/test/dataset/test-file-system-dataset.rb index 1aef38fcca55f..0e856b678f860 100644 --- a/c_glib/test/dataset/test-file-system-dataset.rb +++ b/c_glib/test/dataset/test-file-system-dataset.rb @@ -60,7 +60,6 @@ def test_read_write count: build_int32_array([1, 10, 2, 3])) table_reader = Arrow::TableBatchReader.new(table) scanner_builder = ArrowDataset::ScannerBuilder.new(table_reader) - scanner_builder.use_async = true scanner = scanner_builder.finish options = ArrowDataset::FileSystemDatasetWriteOptions.new options.file_write_options = @format.default_write_options diff --git a/c_glib/test/dataset/test-scanner-builder.rb b/c_glib/test/dataset/test-scanner-builder.rb index 5674db4c3f7ef..83a30e9c4ccda 100644 --- a/c_glib/test/dataset/test-scanner-builder.rb +++ b/c_glib/test/dataset/test-scanner-builder.rb @@ -67,9 +67,4 @@ def test_filter scanner.to_table) end - def test_use_async - @builder.use_async = true - scanner = @builder.finish - assert_equal(@table, scanner.to_table) - end end diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 5c714a5d5b734..e292cf4a9bc6a 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -80,29 +80,6 @@ InMemoryFragment::InMemoryFragment(RecordBatchVector record_batches, physical_schema_ = record_batches_.empty() ? schema({}) : record_batches_[0]->schema(); } -Result InMemoryFragment::Scan(std::shared_ptr options) { - // Make an explicit copy of record_batches_ to ensure Scan can be called - // multiple times. - auto batches_it = MakeVectorIterator(record_batches_); - - auto batch_size = options->batch_size; - // RecordBatch -> ScanTask - auto self = shared_from_this(); - auto fn = [=](std::shared_ptr batch) -> std::shared_ptr { - RecordBatchVector batches; - - auto n_batches = bit_util::CeilDiv(batch->num_rows(), batch_size); - for (int i = 0; i < n_batches; i++) { - batches.push_back(batch->Slice(batch_size * i, batch_size)); - } - - return ::arrow::internal::make_unique(std::move(batches), - std::move(options), self); - }; - - return MakeMapIterator(fn, std::move(batches_it)); -} - Result InMemoryFragment::ScanBatchesAsync( const std::shared_ptr& options) { struct State { diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index a02954a23c83d..21df820099e18 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -55,18 +55,6 @@ class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this { /// The schema is cached after being read once, or may be specified at construction. Result> ReadPhysicalSchema(); - /// \brief Scan returns an iterator of ScanTasks, each of which yields - /// RecordBatches from this Fragment. - /// - /// Note that batches yielded using this method will not be filtered and may not align - /// with the Fragment's schema. In particular, note that columns referenced by the - /// filter may be present in yielded batches even if they are not projected (so that - /// they are available when a filter is applied). Additionally, explicitly projected - /// columns may be absent if they were not present in this fragment. - /// - /// To receive a record batch stream which is fully filtered and projected, use Scanner. - virtual Result Scan(std::shared_ptr options) = 0; - /// An asynchronous version of Scan virtual Result ScanBatchesAsync( const std::shared_ptr& options) = 0; @@ -133,7 +121,6 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { explicit InMemoryFragment(RecordBatchVector record_batches, compute::Expression = compute::literal(true)); - Result Scan(std::shared_ptr options) override; Result ScanBatchesAsync( const std::shared_ptr& options) override; Future> CountRows( diff --git a/cpp/src/arrow/dataset/discovery_test.cc b/cpp/src/arrow/dataset/discovery_test.cc index a51b3c099712e..4ca98baf39fa6 100644 --- a/cpp/src/arrow/dataset/discovery_test.cc +++ b/cpp/src/arrow/dataset/discovery_test.cc @@ -138,7 +138,8 @@ class FileSystemDatasetFactoryTest : public DatasetFactoryTest { } options_ = std::make_shared(); options_->dataset_schema = schema; - ASSERT_OK(SetProjection(options_.get(), schema->field_names())); + ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::Default(*schema)); + SetProjection(options_.get(), std::move(projection)); ASSERT_OK_AND_ASSIGN(dataset_, factory_->Finish(schema)); ASSERT_OK_AND_ASSIGN(auto fragment_it, dataset_->GetFragments()); AssertFragmentsAreFromPath(std::move(fragment_it), paths); diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 2b605b338f707..867244cf04909 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -23,17 +23,18 @@ #include #include +#include "arrow/compute/api_scalar.h" #include "arrow/compute/exec/forest_internal.h" #include "arrow/compute/exec/subtree_internal.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/dataset_writer.h" #include "arrow/dataset/scanner.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/filesystem/filesystem.h" #include "arrow/filesystem/path_util.h" #include "arrow/io/compressed.h" #include "arrow/io/interfaces.h" #include "arrow/io/memory.h" +#include "arrow/util/checked_cast.h" #include "arrow/util/compression.h" #include "arrow/util/iterator.h" #include "arrow/util/macros.h" @@ -45,6 +46,7 @@ namespace arrow { +using internal::checked_cast; using internal::checked_pointer_cast; namespace dataset { @@ -111,66 +113,10 @@ Result> FileFormat::MakeFragment( std::move(partition_expression), std::move(physical_schema))); } -// The following implementation of ScanBatchesAsync is both ugly and terribly inefficient. -// Each of the formats should provide their own efficient implementation. However, this -// is a reasonable starting point or implementation for a dummy/mock format. -Result FileFormat::ScanBatchesAsync( - const std::shared_ptr& scan_options, - const std::shared_ptr& file) const { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanFile(scan_options, file)); - struct State { - State(std::shared_ptr scan_options, ScanTaskIterator scan_task_it) - : scan_options(std::move(scan_options)), - scan_task_it(std::move(scan_task_it)), - current_rb_it(), - finished(false) {} - - std::shared_ptr scan_options; - ScanTaskIterator scan_task_it; - RecordBatchIterator current_rb_it; - bool finished; - }; - struct Generator { - Future> operator()() { - while (!state->finished) { - if (!state->current_rb_it) { - RETURN_NOT_OK(PumpScanTask()); - if (state->finished) { - return AsyncGeneratorEnd>(); - } - } - ARROW_ASSIGN_OR_RAISE(auto next_batch, state->current_rb_it.Next()); - if (IsIterationEnd(next_batch)) { - state->current_rb_it = RecordBatchIterator(); - } else { - return Future>::MakeFinished(next_batch); - } - } - return AsyncGeneratorEnd>(); - } - Status PumpScanTask() { - ARROW_ASSIGN_OR_RAISE(auto next_task, state->scan_task_it.Next()); - if (IsIterationEnd(next_task)) { - state->finished = true; - } else { - ARROW_ASSIGN_OR_RAISE(state->current_rb_it, next_task->Execute()); - } - return Status::OK(); - } - std::shared_ptr state; - }; - return Generator{std::make_shared(scan_options, std::move(scan_task_it))}; -} - Result> FileFragment::ReadPhysicalSchemaImpl() { return format_->Inspect(source_); } -Result FileFragment::Scan(std::shared_ptr options) { - auto self = std::dynamic_pointer_cast(shared_from_this()); - return format_->ScanFile(options, self); -} - Result FileFragment::ScanBatchesAsync( const std::shared_ptr& options) { auto self = std::dynamic_pointer_cast(shared_from_this()); @@ -391,13 +337,6 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options, std::shared_ptr scanner) { - if (!scanner->options()->use_async) { - return Status::Invalid( - "A dataset write operation was invoked on a scanner that was configured for " - "synchronous scanning. Dataset writing requires a scanner configured for " - "asynchronous scanning. Please recreate the scanner with the use_async or " - "UseAsync option set to true"); - } const io::IOContext& io_context = scanner->options()->io_context; std::shared_ptr exec_context = std::make_shared(io_context.pool(), diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index ad89825813f9b..61e7b3e4add22 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -147,15 +147,10 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this> Inspect(const FileSource& source) const = 0; - /// \brief Open a FileFragment for scanning. - /// May populate lazy properties of the FileFragment. - virtual Result ScanFile( + virtual Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const = 0; - virtual Result ScanBatchesAsync( - const std::shared_ptr& options, - const std::shared_ptr& file) const; virtual Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, const std::shared_ptr& options); @@ -186,7 +181,6 @@ class ARROW_DS_EXPORT FileFormat : public std::enable_shared_from_this Scan(std::shared_ptr options) override; Result ScanBatchesAsync( const std::shared_ptr& options) override; Future> CountRows( diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index a2816cc3e315b..4aa2fcd4f25e3 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -198,42 +198,6 @@ static RecordBatchGenerator GeneratorFromReader( return MakeFromFuture(std::move(gen_fut)); } -/// \brief A ScanTask backed by an Csv file. -class CsvScanTask : public ScanTask { - public: - CsvScanTask(std::shared_ptr format, - std::shared_ptr options, - std::shared_ptr fragment) - : ScanTask(std::move(options), fragment), - format_(std::move(format)), - source_(fragment->source()) {} - - Result Execute() override { - auto reader_fut = OpenReaderAsync(source_, *format_, options(), - ::arrow::internal::GetCpuThreadPool()); - auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size); - return MakeGeneratorIterator(std::move(reader_gen)); - } - - Future SafeExecute(Executor* executor) override { - auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor); - auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size); - return CollectAsyncGenerator(reader_gen); - } - - Future<> SafeVisit( - Executor* executor, - std::function)> visitor) override { - auto reader_fut = OpenReaderAsync(source_, *format_, options(), executor); - auto reader_gen = GeneratorFromReader(std::move(reader_fut), options()->batch_size); - return VisitAsyncGenerator(reader_gen, visitor); - } - - private: - std::shared_ptr format_; - FileSource source_; -}; - bool CsvFileFormat::Equals(const FileFormat& format) const { if (type_name() != format.type_name()) return false; @@ -260,15 +224,6 @@ Result> CsvFileFormat::Inspect(const FileSource& source) return reader->schema(); } -Result CsvFileFormat::ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const { - auto this_ = checked_pointer_cast(shared_from_this()); - auto task = std::make_shared(std::move(this_), options, fragment); - - return MakeVectorIterator>({std::move(task)}); -} - Result CsvFileFormat::ScanBatchesAsync( const std::shared_ptr& scan_options, const std::shared_ptr& file) const { diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 8d7391727c67d..bab914559ef4b 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -53,11 +53,6 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; - /// \brief Open a file for scanning - Result ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const override; - Result ScanBatchesAsync( const std::shared_ptr& scan_options, const std::shared_ptr& file) const override; diff --git a/cpp/src/arrow/dataset/file_csv_test.cc b/cpp/src/arrow/dataset/file_csv_test.cc index e4303d022a967..832cbb4deaa07 100644 --- a/cpp/src/arrow/dataset/file_csv_test.cc +++ b/cpp/src/arrow/dataset/file_csv_test.cc @@ -98,15 +98,9 @@ class TestCsvFileFormat : public FileFormatFixtureMixin, return ::arrow::internal::make_unique(info, fs, GetCompression()); } - RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); - } - RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return Batches(std::move(scan_task_it)); + EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + return MakeGeneratorIterator(batch_gen); } }; diff --git a/cpp/src/arrow/dataset/file_ipc.cc b/cpp/src/arrow/dataset/file_ipc.cc index e01373e79c31c..b5a0a3052e096 100644 --- a/cpp/src/arrow/dataset/file_ipc.cc +++ b/cpp/src/arrow/dataset/file_ipc.cc @@ -108,88 +108,6 @@ static inline Result GetReadOptions( return options; } -/// \brief A ScanTask backed by an Ipc file. -class IpcScanTask : public ScanTask { - public: - IpcScanTask(std::shared_ptr fragment, - std::shared_ptr options) - : ScanTask(std::move(options), fragment), source_(fragment->source()) {} - - Result Execute() override { - struct Impl { - static Result Make(const FileSource& source, - const FileFormat& format, - const ScanOptions& scan_options) { - ARROW_ASSIGN_OR_RAISE(auto reader, OpenReader(source)); - ARROW_ASSIGN_OR_RAISE(auto options, - GetReadOptions(*reader->schema(), format, scan_options)); - ARROW_ASSIGN_OR_RAISE(reader, OpenReader(source, options)); - return RecordBatchIterator( - Impl{std::move(reader), scan_options.batch_size, nullptr, 0}); - } - - Result> Next() { - if (leftover_) { - if (leftover_->num_rows() > batch_size) { - auto chunk = leftover_->Slice(0, batch_size); - leftover_ = leftover_->Slice(batch_size); - return chunk; - } - return std::move(leftover_); - } - if (i_ == reader_->num_record_batches()) { - return nullptr; - } - - ARROW_ASSIGN_OR_RAISE(auto batch, reader_->ReadRecordBatch(i_++)); - if (batch->num_rows() > batch_size) { - leftover_ = batch->Slice(batch_size); - return batch->Slice(0, batch_size); - } - return batch; - } - - std::shared_ptr reader_; - const int64_t batch_size; - std::shared_ptr leftover_; - int i_; - }; - - return Impl::Make(source_, *checked_pointer_cast(fragment_)->format(), - *options_); - } - - private: - FileSource source_; -}; - -class IpcScanTaskIterator { - public: - static Result Make(std::shared_ptr options, - std::shared_ptr fragment) { - return ScanTaskIterator(IpcScanTaskIterator(std::move(options), std::move(fragment))); - } - - Result> Next() { - if (once_) { - // Iteration is done. - return nullptr; - } - - once_ = true; - return std::shared_ptr(new IpcScanTask(fragment_, options_)); - } - - private: - IpcScanTaskIterator(std::shared_ptr options, - std::shared_ptr fragment) - : options_(std::move(options)), fragment_(std::move(fragment)) {} - - bool once_ = false; - std::shared_ptr options_; - std::shared_ptr fragment_; -}; - Result IpcFileFormat::IsSupported(const FileSource& source) const { RETURN_NOT_OK(source.Open().status()); return OpenReader(source).ok(); @@ -200,12 +118,6 @@ Result> IpcFileFormat::Inspect(const FileSource& source) return reader->schema(); } -Result IpcFileFormat::ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const { - return IpcScanTaskIterator::Make(options, fragment); -} - Result IpcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { diff --git a/cpp/src/arrow/dataset/file_ipc.h b/cpp/src/arrow/dataset/file_ipc.h index ef78515221c59..4025b6d9763c1 100644 --- a/cpp/src/arrow/dataset/file_ipc.h +++ b/cpp/src/arrow/dataset/file_ipc.h @@ -52,11 +52,6 @@ class ARROW_DS_EXPORT IpcFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; - /// \brief Open a file for scanning - Result ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const override; - Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const override; diff --git a/cpp/src/arrow/dataset/file_ipc_test.cc b/cpp/src/arrow/dataset/file_ipc_test.cc index cb625a9e1db96..6a064f539d9e4 100644 --- a/cpp/src/arrow/dataset/file_ipc_test.cc +++ b/cpp/src/arrow/dataset/file_ipc_test.cc @@ -25,7 +25,6 @@ #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/partition.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" #include "arrow/ipc/reader.h" @@ -127,7 +126,6 @@ TEST_F(TestIpcFileSystemDataset, WriteExceedsMaxPartitions) { write_options_.max_partitions = 2; auto scanner_builder = ScannerBuilder(dataset_, scan_options_); - ASSERT_OK(scanner_builder.UseAsync(true)); EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder.Finish()); EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, testing::HasSubstr("This exceeds the maximum"), FileSystemDataset::Write(write_options_, scanner)); @@ -160,10 +158,8 @@ TEST_P(TestIpcFileFormatScan, FragmentScanOptions) { fragment_scan_options->options = std::make_shared(); fragment_scan_options->options->max_recursion_depth = 0; opts_->fragment_scan_options = fragment_scan_options; - ASSERT_OK_AND_ASSIGN(auto scan_tasks, fragment->Scan(opts_)); - ASSERT_OK_AND_ASSIGN(auto scan_task, scan_tasks.Next()); - ASSERT_OK_AND_ASSIGN(auto batches, scan_task->Execute()); - ASSERT_RAISES(Invalid, batches.Next()); + ASSERT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + ASSERT_FINISHES_AND_RAISES(Invalid, CollectAsyncGenerator(batch_gen)); } INSTANTIATE_TEST_SUITE_P(TestScan, TestIpcFileFormatScan, ::testing::ValuesIn(TestFormatParams::Values()), diff --git a/cpp/src/arrow/dataset/file_orc.cc b/cpp/src/arrow/dataset/file_orc.cc index 44ae3a770bcf7..d316e1e0fd507 100644 --- a/cpp/src/arrow/dataset/file_orc.cc +++ b/cpp/src/arrow/dataset/file_orc.cc @@ -24,8 +24,10 @@ #include "arrow/dataset/file_base.h" #include "arrow/dataset/scanner.h" #include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" #include "arrow/util/iterator.h" #include "arrow/util/logging.h" +#include "arrow/util/thread_pool.h" namespace arrow { @@ -57,13 +59,13 @@ Result> OpenORCReader( } /// \brief A ScanTask backed by an ORC file. -class OrcScanTask : public ScanTask { +class OrcScanTask { public: OrcScanTask(std::shared_ptr fragment, std::shared_ptr options) - : ScanTask(std::move(options), fragment), source_(fragment->source()) {} + : fragment_(std::move(fragment)), options_(std::move(options)) {} - Result Execute() override { + Result Execute() { struct Impl { static Result Make(const FileSource& source, const FileFormat& format, @@ -105,29 +107,32 @@ class OrcScanTask : public ScanTask { std::vector included_fields_; }; - return Impl::Make(source_, *checked_pointer_cast(fragment_)->format(), + return Impl::Make(fragment_->source(), + *checked_pointer_cast(fragment_)->format(), *options_); } private: - FileSource source_; + std::shared_ptr fragment_; + std::shared_ptr options_; }; class OrcScanTaskIterator { public: - static Result Make(std::shared_ptr options, - std::shared_ptr fragment) { - return ScanTaskIterator(OrcScanTaskIterator(std::move(options), std::move(fragment))); + static Result>> Make( + std::shared_ptr options, std::shared_ptr fragment) { + return Iterator>( + OrcScanTaskIterator(std::move(options), std::move(fragment))); } - Result> Next() { + Result> Next() { if (once_) { // Iteration is done. return nullptr; } once_ = true; - return std::shared_ptr(new OrcScanTask(fragment_, options_)); + return std::make_shared(fragment_, options_); } private: @@ -152,10 +157,48 @@ Result> OrcFileFormat::Inspect(const FileSource& source) return reader->ReadSchema(); } -Result OrcFileFormat::ScanFile( +Result OrcFileFormat::ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& fragment) const { - return OrcScanTaskIterator::Make(options, fragment); + const std::shared_ptr& file) const { + // TODO investigate "true" async version + // (https://issues.apache.org/jira/browse/ARROW-13795) + ARROW_ASSIGN_OR_RAISE(auto task_iter, OrcScanTaskIterator::Make(options, file)); + struct IterState { + Iterator> iter; + RecordBatchIterator curr_iter; + bool first; + ::arrow::internal::Executor* io_executor; + }; + struct { + Future> operator()() { + auto state = state_; + return ::arrow::DeferNotOk( + state->io_executor->Submit([state]() -> Result> { + if (state->first) { + ARROW_ASSIGN_OR_RAISE(auto task, state->iter.Next()); + ARROW_ASSIGN_OR_RAISE(state->curr_iter, task->Execute()); + state->first = false; + } + while (!IsIterationEnd(state->curr_iter)) { + ARROW_ASSIGN_OR_RAISE(auto next_batch, state->curr_iter.Next()); + if (IsIterationEnd(next_batch)) { + ARROW_ASSIGN_OR_RAISE(auto task, state->iter.Next()); + if (IsIterationEnd(task)) { + state->curr_iter = IterationEnd(); + } else { + ARROW_ASSIGN_OR_RAISE(state->curr_iter, task->Execute()); + } + } else { + return next_batch; + } + } + return IterationEnd>(); + })); + } + std::shared_ptr state_; + } iter_to_gen{std::shared_ptr( + new IterState{std::move(task_iter), {}, true, options->io_context.executor()})}; + return iter_to_gen; } Future> OrcFileFormat::CountRows( diff --git a/cpp/src/arrow/dataset/file_orc.h b/cpp/src/arrow/dataset/file_orc.h index ca682935bb636..5bbe4df24ad0b 100644 --- a/cpp/src/arrow/dataset/file_orc.h +++ b/cpp/src/arrow/dataset/file_orc.h @@ -51,15 +51,9 @@ class ARROW_DS_EXPORT OrcFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; - /// \brief Open a file for scanning - Result ScanFile( + Result ScanBatchesAsync( const std::shared_ptr& options, - const std::shared_ptr& fragment) const override; - - // TODO add async version (https://issues.apache.org/jira/browse/ARROW-13795) - // Result ScanBatchesAsync( - // const std::shared_ptr& options, - // const std::shared_ptr& file) const override; + const std::shared_ptr& file) const override; Future> CountRows( const std::shared_ptr& file, compute::Expression predicate, diff --git a/cpp/src/arrow/dataset/file_orc_test.cc b/cpp/src/arrow/dataset/file_orc_test.cc index 9505fb67036e5..546879bff42e5 100644 --- a/cpp/src/arrow/dataset/file_orc_test.cc +++ b/cpp/src/arrow/dataset/file_orc_test.cc @@ -25,7 +25,6 @@ #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" #include "arrow/dataset/partition.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" #include "arrow/record_batch.h" diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index ba9be0c8b669d..6c8390b280a70 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -55,82 +55,6 @@ using parquet::arrow::StatisticsAsScalars; namespace { -/// \brief A ScanTask backed by a parquet file and a RowGroup within a parquet file. -class ParquetScanTask : public ScanTask { - public: - ParquetScanTask(int row_group, std::vector column_projection, - std::shared_ptr reader, - std::shared_ptr pre_buffer_once, - std::vector pre_buffer_row_groups, arrow::io::IOContext io_context, - arrow::io::CacheOptions cache_options, - std::shared_ptr options, - std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(fragment)), - row_group_(row_group), - column_projection_(std::move(column_projection)), - reader_(std::move(reader)), - pre_buffer_once_(std::move(pre_buffer_once)), - pre_buffer_row_groups_(std::move(pre_buffer_row_groups)), - io_context_(std::move(io_context)), - cache_options_(cache_options) {} - - Result Execute() override { - // The construction of parquet's RecordBatchReader is deferred here to - // control the memory usage of consumers who materialize all ScanTasks - // before dispatching them, e.g. for scheduling purposes. - // - // The memory and IO incurred by the RecordBatchReader is allocated only - // when Execute is called. - struct { - Result> operator()() const { - return record_batch_reader->Next(); - } - - // The RecordBatchIterator must hold a reference to the FileReader; - // since it must outlive the wrapped RecordBatchReader - std::shared_ptr file_reader; - std::unique_ptr record_batch_reader; - } NextBatch; - - RETURN_NOT_OK(EnsurePreBuffered()); - NextBatch.file_reader = reader_; - RETURN_NOT_OK(reader_->GetRecordBatchReader({row_group_}, column_projection_, - &NextBatch.record_batch_reader)); - return MakeFunctionIterator(std::move(NextBatch)); - } - - // Ensure that pre-buffering has been applied to the underlying Parquet reader - // exactly once (if needed). If we instead set pre_buffer on in the Arrow - // reader properties, each scan task will try to separately pre-buffer, which - // will lead to crashes as they trample the Parquet file reader's internal - // state. Instead, pre-buffer once at the file level. This also has the - // advantage that we can coalesce reads across row groups. - Status EnsurePreBuffered() { - if (pre_buffer_once_) { - BEGIN_PARQUET_CATCH_EXCEPTIONS - std::call_once(*pre_buffer_once_, [this]() { - // Ignore the future here - don't wait for pre-buffering (the reader itself will - // block as necessary) - ARROW_UNUSED(reader_->parquet_reader()->PreBuffer( - pre_buffer_row_groups_, column_projection_, io_context_, cache_options_)); - }); - END_PARQUET_CATCH_EXCEPTIONS - } - return Status::OK(); - } - - private: - int row_group_; - std::vector column_projection_; - std::shared_ptr reader_; - // Pre-buffering state. pre_buffer_once will be nullptr if no pre-buffering is - // to be done. We assume all scan tasks have the same column projection. - std::shared_ptr pre_buffer_once_; - std::vector pre_buffer_row_groups_; - arrow::io::IOContext io_context_; - arrow::io::CacheOptions cache_options_; -}; - parquet::ReaderProperties MakeReaderProperties( const ParquetFileFormat& format, ParquetFragmentScanOptions* parquet_scan_options, MemoryPool* pool = default_memory_pool()) { @@ -400,63 +324,6 @@ Future> ParquetFileFormat::GetReader }); } -Result ParquetFileFormat::ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& fragment) const { - auto* parquet_fragment = checked_cast(fragment.get()); - std::vector row_groups; - - bool pre_filtered = false; - auto MakeEmpty = [] { return MakeEmptyIterator>(); }; - - // If RowGroup metadata is cached completely we can pre-filter RowGroups before opening - // a FileReader, potentially avoiding IO altogether if all RowGroups are excluded due to - // prior statistics knowledge. In the case where a RowGroup doesn't have statistics - // metdata, it will not be excluded. - if (parquet_fragment->metadata() != nullptr) { - ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); - - pre_filtered = true; - if (row_groups.empty()) MakeEmpty(); - } - - // Open the reader and pay the real IO cost. - ARROW_ASSIGN_OR_RAISE(std::shared_ptr reader, - GetReader(fragment->source(), options.get())); - - // Ensure that parquet_fragment has FileMetaData - RETURN_NOT_OK(parquet_fragment->EnsureCompleteMetadata(reader.get())); - - if (!pre_filtered) { - // row groups were not already filtered; do this now - ARROW_ASSIGN_OR_RAISE(row_groups, parquet_fragment->FilterRowGroups(options->filter)); - - if (row_groups.empty()) MakeEmpty(); - } - - auto column_projection = InferColumnProjection(*reader, *options); - ScanTaskVector tasks(row_groups.size()); - - ARROW_ASSIGN_OR_RAISE( - auto parquet_scan_options, - GetFragmentScanOptions(kParquetTypeName, options.get(), - default_fragment_scan_options)); - std::shared_ptr pre_buffer_once = nullptr; - if (parquet_scan_options->arrow_reader_properties->pre_buffer()) { - pre_buffer_once = std::make_shared(); - } - - for (size_t i = 0; i < row_groups.size(); ++i) { - tasks[i] = std::make_shared( - row_groups[i], column_projection, reader, pre_buffer_once, row_groups, - parquet_scan_options->arrow_reader_properties->io_context(), - parquet_scan_options->arrow_reader_properties->cache_options(), options, - fragment); - } - - return MakeVectorIterator(std::move(tasks)); -} - Result ParquetFileFormat::ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const { diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index daf4bd92d59b3..1db9407203647 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -95,11 +95,6 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat { /// \brief Return the schema of the file if possible. Result> Inspect(const FileSource& source) const override; - /// \brief Open a file for scanning - Result ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& file) const override; - Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& file) const override; diff --git a/cpp/src/arrow/dataset/file_parquet_test.cc b/cpp/src/arrow/dataset/file_parquet_test.cc index 1612f1d2590bd..090737ad0629b 100644 --- a/cpp/src/arrow/dataset/file_parquet_test.cc +++ b/cpp/src/arrow/dataset/file_parquet_test.cc @@ -21,8 +21,8 @@ #include #include +#include "arrow/compute/api_scalar.h" #include "arrow/dataset/dataset_internal.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/io/memory.h" #include "arrow/io/util_internal.h" @@ -121,15 +121,9 @@ class ParquetFormatHelper { class TestParquetFileFormat : public FileFormatFixtureMixin { public: - RecordBatchIterator Batches(ScanTaskIterator scan_task_it) { - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); - } - RecordBatchIterator Batches(Fragment* fragment) { - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return Batches(std::move(scan_task_it)); + EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + return MakeGeneratorIterator(batch_gen); } std::shared_ptr SingleBatch(Fragment* fragment) { @@ -298,7 +292,6 @@ TEST_F(TestParquetFileFormat, MultithreadedScan) { FragmentDataset dataset(ArithmeticDatasetFixture::schema(), {fragment}); ScannerBuilder builder({&dataset, [](...) {}}); - ASSERT_OK(builder.UseAsync(true)); ASSERT_OK(builder.UseThreads(true)); ASSERT_OK(builder.Project({call("add", {field_ref("i64"), literal(3)})}, {""})); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); @@ -417,7 +410,6 @@ TEST_P(TestParquetFileFormatScan, ScanRecordBatchReaderPreBuffer) { auto fragment_scan_options = std::make_shared(); fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); opts_->fragment_scan_options = fragment_scan_options; - ASSERT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); int64_t row_count = 0; for (auto maybe_batch : PhysicalBatches(fragment)) { @@ -581,10 +573,14 @@ TEST_P(TestParquetFileFormatScan, ExplicitRowGroupSelection) { SetFilter(greater(field_ref("i64"), literal(3))); CountRowsAndBatchesInScan(row_groups_fragment({2, 3, 4, 5}), 4 + 5 + 6, 3); + ASSERT_OK_AND_ASSIGN(auto batch_gen, + row_groups_fragment({kNumRowGroups + 1})->ScanBatchesAsync(opts_)); + Status scan_status = CollectAsyncGenerator(batch_gen).status(); + EXPECT_RAISES_WITH_MESSAGE_THAT( IndexError, testing::HasSubstr("only has " + std::to_string(kNumRowGroups) + " row groups"), - row_groups_fragment({kNumRowGroups + 1})->Scan(opts_)); + scan_status); } TEST_P(TestParquetFileFormatScan, PredicatePushdownRowGroupFragmentsUsingStringColumn) { diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 360a32604da3a..cc89c163cb758 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -85,10 +85,22 @@ TEST(FileSource, BufferBased) { ASSERT_EQ(source1.buffer(), source3.buffer()); } -constexpr int kNumScanTasks = 2; -constexpr int kBatchesPerScanTask = 2; +constexpr int kNumBatches = 4; constexpr int kRowsPerBatch = 1024; class MockFileFormat : public FileFormat { + public: + Result ScanBatchesAsync( + const std::shared_ptr& options, + const std::shared_ptr& file) const override { + auto sch = schema({field("i32", int32())}); + RecordBatchVector batches; + for (int i = 0; i < kNumBatches; i++) { + batches.push_back(ConstantArrayGenerator::Zeroes(kRowsPerBatch, sch)); + } + return MakeVectorGenerator(std::move(batches)); + } + + protected: std::string type_name() const override { return "mock"; } bool Equals(const FileFormat& other) const override { return false; } Result IsSupported(const FileSource& source) const override { return true; } @@ -102,22 +114,6 @@ class MockFileFormat : public FileFormat { return Status::NotImplemented("Not needed for test"); } std::shared_ptr DefaultWriteOptions() override { return nullptr; } - - Result ScanFile( - const std::shared_ptr& options, - const std::shared_ptr& file) const override { - auto sch = schema({field("i32", int32())}); - ScanTaskVector scan_tasks; - for (int i = 0; i < kNumScanTasks; i++) { - RecordBatchVector batches; - for (int j = 0; j < kBatchesPerScanTask; j++) { - batches.push_back(ConstantArrayGenerator::Zeroes(kRowsPerBatch, sch)); - } - scan_tasks.push_back(std::make_shared( - std::move(batches), std::make_shared(), nullptr)); - } - return MakeVectorIterator(std::move(scan_tasks)); - } }; TEST(FileFormat, ScanAsync) { @@ -125,8 +121,8 @@ TEST(FileFormat, ScanAsync) { auto scan_options = std::make_shared(); ASSERT_OK_AND_ASSIGN(auto batch_gen, format.ScanBatchesAsync(scan_options, nullptr)); ASSERT_FINISHES_OK_AND_ASSIGN(auto batches, CollectAsyncGenerator(batch_gen)); - ASSERT_EQ(kNumScanTasks * kBatchesPerScanTask, static_cast(batches.size())); - for (int i = 0; i < kNumScanTasks * kBatchesPerScanTask; i++) { + ASSERT_EQ(kNumBatches, static_cast(batches.size())); + for (int i = 0; i < kNumBatches; i++) { ASSERT_EQ(kRowsPerBatch, batches[i]->num_rows()); } } @@ -320,7 +316,6 @@ TEST_F(TestFileSystemDataset, WriteProjected) { ASSERT_OK(scanner_builder->Project( {compute::call("add", {compute::field_ref("a"), compute::literal(1)})}, {"a_plus_one"})); - ASSERT_OK(scanner_builder->UseAsync(true)); ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); ASSERT_OK(FileSystemDataset::Write(write_options, scanner)); diff --git a/cpp/src/arrow/dataset/partition_test.cc b/cpp/src/arrow/dataset/partition_test.cc index 7643c9466058d..ce5760e7b6c8d 100644 --- a/cpp/src/arrow/dataset/partition_test.cc +++ b/cpp/src/arrow/dataset/partition_test.cc @@ -28,7 +28,6 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/filesystem/path_util.h" #include "arrow/status.h" diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 23942ec37da6f..8384adcd890a2 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -24,6 +24,7 @@ #include #include "arrow/array/array_primitive.h" +#include "arrow/array/util.h" #include "arrow/compute/api_aggregate.h" #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" @@ -32,7 +33,6 @@ #include "arrow/dataset/dataset.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/plan.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/iterator.h" @@ -46,6 +46,8 @@ using internal::Executor; using internal::SerialExecutor; using internal::TaskGroup; +using internal::checked_cast; + namespace dataset { using FragmentGenerator = std::function>()>; @@ -63,110 +65,6 @@ std::vector ScanOptions::MaterializedFields() const { return fields; } -std::shared_ptr ScanOptions::TaskGroup() const { - if (use_threads) { - auto* thread_pool = arrow::internal::GetCpuThreadPool(); - return TaskGroup::MakeThreaded(thread_pool); - } - return TaskGroup::MakeSerial(); -} - -Result InMemoryScanTask::Execute() { - return MakeVectorIterator(record_batches_); -} - -Future ScanTask::SafeExecute(Executor* executor) { - // If the ScanTask can't possibly be async then just execute it - ARROW_ASSIGN_OR_RAISE(auto rb_it, Execute()); - return Future::MakeFinished(rb_it.ToVector()); -} - -Future<> ScanTask::SafeVisit( - Executor* executor, std::function)> visitor) { - // If the ScanTask can't possibly be async then just execute it - ARROW_ASSIGN_OR_RAISE(auto rb_it, Execute()); - return Future<>::MakeFinished(rb_it.Visit(visitor)); -} - -Result Scanner::Scan() { - // TODO(ARROW-12289) This is overridden in SyncScanner and will never be implemented in - // AsyncScanner. It is deprecated and will eventually go away. - return Status::NotImplemented("This scanner does not support the legacy Scan() method"); -} - -Result Scanner::ScanBatchesUnordered() { - // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then we just - // fall back to an ordered scan and assign the appropriate tagging - ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches()); - return AddPositioningToInOrderScan(std::move(ordered_scan)); -} - -Result Scanner::AddPositioningToInOrderScan( - TaggedRecordBatchIterator scan) { - ARROW_ASSIGN_OR_RAISE(auto first, scan.Next()); - if (IsIterationEnd(first)) { - return MakeEmptyIterator(); - } - struct State { - State(TaggedRecordBatchIterator source, TaggedRecordBatch first) - : source(std::move(source)), - batch_index(0), - fragment_index(0), - finished(false), - prev_batch(std::move(first)) {} - TaggedRecordBatchIterator source; - int batch_index; - int fragment_index; - bool finished; - TaggedRecordBatch prev_batch; - }; - struct EnumeratingIterator { - Result Next() { - if (state->finished) { - return IterationEnd(); - } - ARROW_ASSIGN_OR_RAISE(auto next, state->source.Next()); - if (IsIterationEnd(next)) { - state->finished = true; - return EnumeratedRecordBatch{ - {std::move(state->prev_batch.record_batch), state->batch_index, true}, - {std::move(state->prev_batch.fragment), state->fragment_index, true}}; - } - auto prev = std::move(state->prev_batch); - bool prev_is_last_batch = false; - auto prev_batch_index = state->batch_index; - auto prev_fragment_index = state->fragment_index; - // Reference equality here seems risky but a dataset should have a constant set of - // fragments which should be consistent for the lifetime of a scan - if (prev.fragment.get() != next.fragment.get()) { - state->batch_index = 0; - state->fragment_index++; - prev_is_last_batch = true; - } else { - state->batch_index++; - } - state->prev_batch = std::move(next); - return EnumeratedRecordBatch{ - {std::move(prev.record_batch), prev_batch_index, prev_is_last_batch}, - {std::move(prev.fragment), prev_fragment_index, false}}; - } - std::shared_ptr state; - }; - return EnumeratedRecordBatchIterator( - EnumeratingIterator{std::make_shared(std::move(scan), std::move(first))}); -} - -Result Scanner::CountRows() { - // Naive base implementation - ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatchesUnordered()); - int64_t count = 0; - RETURN_NOT_OK(batch_it.Visit([&](EnumeratedRecordBatch batch) { - count += batch.record_batch.value->num_rows(); - return Status::OK(); - })); - return count; -} - namespace { class ScannerRecordBatchReader : public RecordBatchReader { public: @@ -191,235 +89,8 @@ class ScannerRecordBatchReader : public RecordBatchReader { }; } // namespace -Result> Scanner::ToRecordBatchReader() { - ARROW_ASSIGN_OR_RAISE(auto it, ScanBatches()); - return std::make_shared(options()->projected_schema, - std::move(it)); -} - namespace { -struct ScanBatchesState : public std::enable_shared_from_this { - explicit ScanBatchesState(ScanTaskIterator scan_task_it, - std::shared_ptr task_group_) - : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} - - void ResizeBatches(size_t task_index) { - if (task_batches.size() <= task_index) { - task_batches.resize(task_index + 1); - task_drained.resize(task_index + 1); - } - } - - void Push(TaggedRecordBatch batch, size_t task_index) { - { - std::lock_guard lock(mutex); - ResizeBatches(task_index); - task_batches[task_index].push_back(std::move(batch)); - } - ready.notify_one(); - } - - template - Result PushError(Result&& result, size_t task_index) { - if (!result.ok()) { - { - std::lock_guard lock(mutex); - task_drained[task_index] = true; - iteration_error = result.status(); - } - ready.notify_one(); - } - return std::move(result); - } - - Status Finish(size_t task_index) { - { - std::lock_guard lock(mutex); - ResizeBatches(task_index); - task_drained[task_index] = true; - } - ready.notify_one(); - return Status::OK(); - } - - void PushScanTask() { - if (no_more_tasks) { - return; - } - std::unique_lock lock(mutex); - auto maybe_task = scan_tasks.Next(); - if (!maybe_task.ok()) { - no_more_tasks = true; - iteration_error = maybe_task.status(); - return; - } - auto scan_task = maybe_task.ValueOrDie(); - if (IsIterationEnd(scan_task)) { - no_more_tasks = true; - return; - } - auto state = shared_from_this(); - auto id = next_scan_task_id++; - ResizeBatches(id); - - lock.unlock(); - task_group->Append([state, id, scan_task]() { - // If we were to return an error to the task group, subsequent tasks - // may never be executed, which would produce a deadlock in Pop() - // (ARROW-13480). - auto status_unused = [&]() { - ARROW_ASSIGN_OR_RAISE(auto batch_it, state->PushError(scan_task->Execute(), id)); - for (auto maybe_batch : batch_it) { - ARROW_ASSIGN_OR_RAISE(auto batch, state->PushError(std::move(maybe_batch), id)); - state->Push(TaggedRecordBatch{std::move(batch), scan_task->fragment()}, id); - } - return state->Finish(id); - }(); - return Status::OK(); - }); - } - - Result Pop() { - std::unique_lock lock(mutex); - ready.wait(lock, [this, &lock] { - while (pop_cursor < task_batches.size()) { - // queue for current scan task contains at least one batch, pop that - if (!task_batches[pop_cursor].empty()) return true; - // queue is empty but will be appended to eventually, wait for that - if (!task_drained[pop_cursor]) return false; - - // Finished draining current scan task, enqueue a new one - ++pop_cursor; - // Must unlock since serial task group will execute synchronously - lock.unlock(); - PushScanTask(); - lock.lock(); - } - DCHECK(no_more_tasks); - // all scan tasks drained (or getting next task failed), terminate - return true; - }); - - // We're not bubbling any task errors into the task group - DCHECK(task_group->ok()); - - if (pop_cursor == task_batches.size()) { - // Don't report an error until we yield up everything we can first - RETURN_NOT_OK(iteration_error); - return IterationEnd(); - } - - auto batch = std::move(task_batches[pop_cursor].front()); - task_batches[pop_cursor].pop_front(); - return batch; - } - - /// Protecting mutating accesses to batches - std::mutex mutex; - std::condition_variable ready; - ScanTaskIterator scan_tasks; - std::shared_ptr task_group; - int next_scan_task_id = 0; - bool no_more_tasks = false; - Status iteration_error; - std::vector> task_batches; - std::vector task_drained; - size_t pop_cursor = 0; -}; - -class SyncScanner : public Scanner { - public: - SyncScanner(std::shared_ptr dataset, std::shared_ptr scan_options) - : Scanner(std::move(scan_options)), dataset_(std::move(dataset)) {} - - Result ScanBatches() override; - Result Scan() override; - Status Scan(std::function visitor) override; - Result> ToTable() override; - Result ScanBatchesAsync() override; - Result ScanBatchesUnorderedAsync() override; - Result CountRows() override; - const std::shared_ptr& dataset() const override; - - protected: - /// \brief GetFragments returns an iterator over all Fragments in this scan. - Result GetFragments(); - Result ScanBatches(ScanTaskIterator scan_task_it); - Future> ToTableInternal(Executor* cpu_executor); - Result ScanInternal(); - - std::shared_ptr dataset_; -}; - -Result SyncScanner::ScanBatches() { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanInternal()); - return ScanBatches(std::move(scan_task_it)); -} - -Result SyncScanner::ScanBatches( - ScanTaskIterator scan_task_it) { - auto task_group = scan_options_->TaskGroup(); - auto state = std::make_shared(std::move(scan_task_it), task_group); - for (int i = 0; i < scan_options_->fragment_readahead; i++) { - state->PushScanTask(); - } - return MakeFunctionIterator([task_group, state]() -> Result { - ARROW_ASSIGN_OR_RAISE(auto batch, state->Pop()); - if (!IsIterationEnd(batch)) return batch; - RETURN_NOT_OK(task_group->Finish()); - return IterationEnd(); - }); -} - -Result SyncScanner::ScanBatchesAsync() { - return Status::NotImplemented("Asynchronous scanning is not supported by SyncScanner"); -} - -Result SyncScanner::ScanBatchesUnorderedAsync() { - return Status::NotImplemented("Asynchronous scanning is not supported by SyncScanner"); -} - -Result SyncScanner::GetFragments() { - // Transform Datasets in a flat Iterator. This - // iterator is lazily constructed, i.e. Dataset::GetFragments is - // not invoked until a Fragment is requested. - return GetFragmentsFromDatasets({dataset_}, scan_options_->filter); -} - -Result SyncScanner::Scan() { return ScanInternal(); } - -Status SyncScanner::Scan(std::function visitor) { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanInternal()); - - auto task_group = scan_options_->TaskGroup(); - - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - task_group->Append([scan_task, visitor] { - ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); - for (auto maybe_batch : batch_it) { - ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); - RETURN_NOT_OK( - visitor(TaggedRecordBatch{std::move(batch), scan_task->fragment()})); - } - return Status::OK(); - }); - } - - return task_group->Finish(); -} - -Result SyncScanner::ScanInternal() { - // Transforms Iterator into a unified - // Iterator. The first Iterator::Next invocation is going to do - // all the work of unwinding the chained iterators. - ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()); - return GetScanTaskIterator(std::move(fragment_it), scan_options_); -} - -const std::shared_ptr& SyncScanner::dataset() const { return dataset_; } - class AsyncScanner : public Scanner, public std::enable_shared_from_this { public: AsyncScanner(std::shared_ptr dataset, @@ -433,8 +104,11 @@ class AsyncScanner : public Scanner, public std::enable_shared_from_this ScanBatchesAsync() override; Result ScanBatchesUnordered() override; Result ScanBatchesUnorderedAsync() override; + Result> TakeRows(const Array& indices) override; + Result> Head(int64_t num_rows) override; Result> ToTable() override; Result CountRows() override; + Result> ToRecordBatchReader() override; const std::shared_ptr& dataset() const override; private: @@ -489,21 +163,6 @@ const FieldVector kAugmentedFields{ field("__last_in_fragment", boolean()), }; -class OneShotScanTask : public ScanTask { - public: - OneShotScanTask(RecordBatchIterator batch_it, std::shared_ptr options, - std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(fragment)), - batch_it_(std::move(batch_it)) {} - Result Execute() override { - if (!batch_it_) return Status::Invalid("OneShotScanTask was already scanned"); - return std::move(batch_it_); - } - - private: - RecordBatchIterator batch_it_; -}; - class OneShotFragment : public Fragment { public: OneShotFragment(std::shared_ptr schema, RecordBatchIterator batch_it) @@ -515,12 +174,6 @@ class OneShotFragment : public Fragment { if (!batch_it_) return Status::Invalid("OneShotFragment was already scanned"); return Status::OK(); } - Result Scan(std::shared_ptr options) override { - RETURN_NOT_OK(CheckConsumed()); - ScanTaskVector tasks{std::make_shared( - std::move(batch_it_), std::move(options), shared_from_this())}; - return MakeVectorIterator(std::move(tasks)); - } Result ScanBatchesAsync( const std::shared_ptr& options) override { RETURN_NOT_OK(CheckConsumed()); @@ -645,6 +298,111 @@ Result AsyncScanner::ScanBatchesUnorderedAsync( }); } +Result> AsyncScanner::TakeRows(const Array& indices) { + if (indices.null_count() != 0) { + return Status::NotImplemented("null take indices"); + } + + compute::ExecContext ctx(scan_options_->pool); + + const Array* original_indices; + // If we have to cast, this is the backing reference + std::shared_ptr original_indices_ptr; + if (indices.type_id() != Type::INT64) { + ARROW_ASSIGN_OR_RAISE( + original_indices_ptr, + compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx)); + original_indices = original_indices_ptr.get(); + } else { + original_indices = &indices; + } + + std::shared_ptr unsort_indices; + { + ARROW_ASSIGN_OR_RAISE( + auto sort_indices, + compute::SortIndices(*original_indices, compute::SortOrder::Ascending, &ctx)); + ARROW_ASSIGN_OR_RAISE(original_indices_ptr, + compute::Take(*original_indices, *sort_indices, + compute::TakeOptions::Defaults(), &ctx)); + original_indices = original_indices_ptr.get(); + ARROW_ASSIGN_OR_RAISE( + unsort_indices, + compute::SortIndices(*sort_indices, compute::SortOrder::Ascending, &ctx)); + } + + RecordBatchVector out_batches; + + auto raw_indices = static_cast(*original_indices).raw_values(); + int64_t offset = 0, row_begin = 0; + + ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches()); + while (true) { + ARROW_ASSIGN_OR_RAISE(auto batch, batch_it.Next()); + if (IsIterationEnd(batch)) break; + if (offset == original_indices->length()) break; + DCHECK_LT(offset, original_indices->length()); + + int64_t length = 0; + while (offset + length < original_indices->length()) { + auto rel_index = raw_indices[offset + length] - row_begin; + if (rel_index >= batch.record_batch->num_rows()) break; + ++length; + } + DCHECK_LE(offset + length, original_indices->length()); + if (length == 0) { + row_begin += batch.record_batch->num_rows(); + continue; + } + + Datum rel_indices = original_indices->Slice(offset, length); + ARROW_ASSIGN_OR_RAISE(rel_indices, + compute::Subtract(rel_indices, Datum(row_begin), + compute::ArithmeticOptions(), &ctx)); + + ARROW_ASSIGN_OR_RAISE(Datum out_batch, + compute::Take(batch.record_batch, rel_indices, + compute::TakeOptions::Defaults(), &ctx)); + out_batches.push_back(out_batch.record_batch()); + + offset += length; + row_begin += batch.record_batch->num_rows(); + } + + if (offset < original_indices->length()) { + std::stringstream error; + const int64_t max_values_shown = 3; + const int64_t num_remaining = original_indices->length() - offset; + for (int64_t i = 0; i < std::min(max_values_shown, num_remaining); i++) { + if (i > 0) error << ", "; + error << static_cast(original_indices)->Value(offset + i); + } + if (num_remaining > max_values_shown) error << ", ..."; + return Status::IndexError("Some indices were out of bounds: ", error.str()); + } + ARROW_ASSIGN_OR_RAISE(Datum out, Table::FromRecordBatches(options()->projected_schema, + std::move(out_batches))); + ARROW_ASSIGN_OR_RAISE( + out, compute::Take(out, unsort_indices, compute::TakeOptions::Defaults(), &ctx)); + return out.table(); +} + +Result> AsyncScanner::Head(int64_t num_rows) { + if (num_rows == 0) { + return Table::FromRecordBatches(options()->projected_schema, {}); + } + ARROW_ASSIGN_OR_RAISE(auto batch_iterator, ScanBatches()); + RecordBatchVector batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(auto batch, batch_iterator.Next()); + if (IsIterationEnd(batch)) break; + batches.push_back(batch.record_batch->Slice(0, num_rows)); + num_rows -= batch.record_batch->num_rows(); + if (num_rows <= 0) break; + } + return Table::FromRecordBatches(options()->projected_schema, batches); +} + Result AsyncScanner::ScanBatchesAsync() { return ScanBatchesAsync(::arrow::internal::GetCpuThreadPool()); } @@ -774,7 +532,10 @@ Result AsyncScanner::CountRows() { ARROW_ASSIGN_OR_RAISE(auto plan, compute::ExecPlan::Make(&exec_context)); // Drop projection since we only need to count rows const auto options = std::make_shared(*scan_options_); - RETURN_NOT_OK(SetProjection(options.get(), std::vector())); + ARROW_ASSIGN_OR_RAISE(auto empty_projection, + ProjectionDescr::FromNames(std::vector(), + *scan_options_->dataset_schema)); + SetProjection(options.get(), empty_projection); std::atomic total{0}; @@ -823,10 +584,77 @@ Result AsyncScanner::CountRows() { return total.load(); } +Result> AsyncScanner::ToRecordBatchReader() { + ARROW_ASSIGN_OR_RAISE(auto it, ScanBatches()); + return std::make_shared(options()->projected_schema, + std::move(it)); +} + const std::shared_ptr& AsyncScanner::dataset() const { return dataset_; } +Status NestedFieldRefsNotImplemented() { + // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that + // only top level fields will be materialized. + return Status::NotImplemented("Nested field references in scans."); +} + } // namespace +Result ProjectionDescr::FromStructExpression( + const compute::Expression& projection, const Schema& dataset_schema) { + ARROW_ASSIGN_OR_RAISE(compute::Expression bound_expression, + projection.Bind(dataset_schema)); + + if (bound_expression.type()->id() != Type::STRUCT) { + return Status::Invalid("Projection ", projection.ToString(), + " cannot yield record batches"); + } + std::shared_ptr projection_schema = + ::arrow::schema(checked_cast(*bound_expression.type()).fields(), + dataset_schema.metadata()); + + return ProjectionDescr{std::move(bound_expression), std::move(projection_schema)}; +} + +Result ProjectionDescr::FromExpressions( + std::vector exprs, std::vector names, + const Schema& dataset_schema) { + compute::MakeStructOptions project_options{std::move(names)}; + + for (size_t i = 0; i < exprs.size(); ++i) { + if (auto ref = exprs[i].field_ref()) { + if (!ref->name()) return NestedFieldRefsNotImplemented(); + + // set metadata and nullability for plain field references + ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(dataset_schema)); + project_options.field_nullability[i] = field->nullable(); + project_options.field_metadata[i] = field->metadata(); + } + } + + return ProjectionDescr::FromStructExpression( + call("make_struct", std::move(exprs), std::move(project_options)), dataset_schema); +} + +Result ProjectionDescr::FromNames(std::vector names, + const Schema& dataset_schema) { + std::vector exprs(names.size()); + for (size_t i = 0; i < exprs.size(); ++i) { + exprs[i] = compute::field_ref(names[i]); + } + return ProjectionDescr::FromExpressions(std::move(exprs), std::move(names), + dataset_schema); +} + +Result ProjectionDescr::Default(const Schema& dataset_schema) { + return ProjectionDescr::FromNames(dataset_schema.field_names(), dataset_schema); +} + +void SetProjection(ScanOptions* options, ProjectionDescr projection) { + options->projection = std::move(projection.expression); + options->projected_schema = std::move(projection.schema); +} + ScannerBuilder::ScannerBuilder(std::shared_ptr dataset) : ScannerBuilder(std::move(dataset), std::make_shared()) {} @@ -862,16 +690,31 @@ const std::shared_ptr& ScannerBuilder::projected_schema() const { } Status ScannerBuilder::Project(std::vector columns) { - return SetProjection(scan_options_.get(), std::move(columns)); + ARROW_ASSIGN_OR_RAISE( + auto projection, + ProjectionDescr::FromNames(std::move(columns), *scan_options_->dataset_schema)); + SetProjection(scan_options_.get(), std::move(projection)); + return Status::OK(); } Status ScannerBuilder::Project(std::vector exprs, std::vector names) { - return SetProjection(scan_options_.get(), std::move(exprs), std::move(names)); + ARROW_ASSIGN_OR_RAISE(auto projection, ProjectionDescr::FromExpressions( + std::move(exprs), std::move(names), + *scan_options_->dataset_schema)); + SetProjection(scan_options_.get(), std::move(projection)); + return Status::OK(); } Status ScannerBuilder::Filter(const compute::Expression& filter) { - return SetFilter(scan_options_.get(), filter); + for (const auto& ref : FieldsInExpression(filter)) { + if (!ref.name()) return NestedFieldRefsNotImplemented(); + + RETURN_NOT_OK(ref.FindOne(*scan_options_->dataset_schema)); + } + ARROW_ASSIGN_OR_RAISE(scan_options_->filter, + filter.Bind(*scan_options_->dataset_schema)); + return Status::OK(); } Status ScannerBuilder::UseThreads(bool use_threads) { @@ -888,11 +731,6 @@ Status ScannerBuilder::FragmentReadahead(int fragment_readahead) { return Status::OK(); } -Status ScannerBuilder::UseAsync(bool use_async) { - scan_options_->use_async = use_async; - return Status::OK(); -} - Status ScannerBuilder::BatchSize(int64_t batch_size) { if (batch_size <= 0) { return Status::Invalid("BatchSize must be greater than 0, got ", batch_size); @@ -917,224 +755,7 @@ Result> ScannerBuilder::Finish() { RETURN_NOT_OK(Project(scan_options_->dataset_schema->field_names())); } - if (scan_options_->use_async) { - return std::make_shared(dataset_, scan_options_); - } else { - return std::make_shared(dataset_, scan_options_); - } -} - -namespace { - -inline RecordBatchVector FlattenRecordBatchVector( - std::vector nested_batches) { - RecordBatchVector flattened; - - for (auto& task_batches : nested_batches) { - for (auto& batch : task_batches) { - flattened.emplace_back(std::move(batch)); - } - } - - return flattened; -} - -struct TableAssemblyState { - /// Protecting mutating accesses to batches - std::mutex mutex{}; - std::vector batches{}; - - void Emplace(RecordBatchVector b, size_t position) { - std::lock_guard lock(mutex); - if (batches.size() <= position) { - batches.resize(position + 1); - } - batches[position] = std::move(b); - } -}; - -Result> SyncScanner::ToTable() { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, ScanInternal()); - auto task_group = scan_options_->TaskGroup(); - - /// Wraps the state in a shared_ptr to ensure that failing ScanTasks don't - /// invalidate concurrently running tasks when Finish() early returns - /// and the mutex/batches fail out of scope. - auto state = std::make_shared(); - - // TODO (ARROW-11797) Migrate to using ScanBatches() - size_t scan_task_id = 0; - for (auto maybe_scan_task : scan_task_it) { - ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task); - - auto id = scan_task_id++; - task_group->Append([state, id, scan_task] { - ARROW_ASSIGN_OR_RAISE( - auto local, - ::arrow::internal::SerialExecutor::RunInSerialExecutor( - [&](Executor* executor) { return scan_task->SafeExecute(executor); })); - state->Emplace(std::move(local), id); - return Status::OK(); - }); - } - auto scan_options = scan_options_; - // Wait for all tasks to complete, or the first error - RETURN_NOT_OK(task_group->Finish()); - return Table::FromRecordBatches(scan_options->projected_schema, - FlattenRecordBatchVector(std::move(state->batches))); -} - -Result SyncScanner::CountRows() { - // While readers could implement an optimization where they just fabricate empty - // batches based on metadata when no columns are selected, skipping I/O (and - // indeed, the Parquet reader does this), counting rows using that optimization is - // still slower than just hitting metadata directly where possible. - ARROW_ASSIGN_OR_RAISE(auto fragment_it, GetFragments()); - // Fragment is non-null iff fast path could not be taken. - std::vector>>> futures; - for (auto maybe_fragment : fragment_it) { - ARROW_ASSIGN_OR_RAISE(auto fragment, maybe_fragment); - auto count_fut = fragment->CountRows(scan_options_->filter, scan_options_); - futures.push_back( - count_fut.Then([fragment](const util::optional& count) - -> std::pair> { - if (count.has_value()) { - return std::make_pair(*count, nullptr); - } - return std::make_pair(0, std::move(fragment)); - })); - } - - int64_t count = 0; - FragmentVector fragments; - for (auto& future : futures) { - ARROW_ASSIGN_OR_RAISE(auto count_result, future.result()); - count += count_result.first; - if (count_result.second) { - fragments.push_back(std::move(count_result.second)); - } - } - // Now check for any fragments where we couldn't take the fast path - if (!fragments.empty()) { - auto options = std::make_shared(*scan_options_); - RETURN_NOT_OK(SetProjection(options.get(), std::vector())); - ARROW_ASSIGN_OR_RAISE( - auto scan_task_it, - GetScanTaskIterator(MakeVectorIterator(std::move(fragments)), options)); - ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches(std::move(scan_task_it))); - RETURN_NOT_OK(batch_it.Visit([&](TaggedRecordBatch batch) { - count += batch.record_batch->num_rows(); - return Status::OK(); - })); - } - return count; -} - -} // namespace - -Result> Scanner::TakeRows(const Array& indices) { - if (indices.null_count() != 0) { - return Status::NotImplemented("null take indices"); - } - - compute::ExecContext ctx(scan_options_->pool); - - const Array* original_indices; - // If we have to cast, this is the backing reference - std::shared_ptr original_indices_ptr; - if (indices.type_id() != Type::INT64) { - ARROW_ASSIGN_OR_RAISE( - original_indices_ptr, - compute::Cast(indices, int64(), compute::CastOptions::Safe(), &ctx)); - original_indices = original_indices_ptr.get(); - } else { - original_indices = &indices; - } - - std::shared_ptr unsort_indices; - { - ARROW_ASSIGN_OR_RAISE( - auto sort_indices, - compute::SortIndices(*original_indices, compute::SortOrder::Ascending, &ctx)); - ARROW_ASSIGN_OR_RAISE(original_indices_ptr, - compute::Take(*original_indices, *sort_indices, - compute::TakeOptions::Defaults(), &ctx)); - original_indices = original_indices_ptr.get(); - ARROW_ASSIGN_OR_RAISE( - unsort_indices, - compute::SortIndices(*sort_indices, compute::SortOrder::Ascending, &ctx)); - } - - RecordBatchVector out_batches; - - auto raw_indices = static_cast(*original_indices).raw_values(); - int64_t offset = 0, row_begin = 0; - - ARROW_ASSIGN_OR_RAISE(auto batch_it, ScanBatches()); - while (true) { - ARROW_ASSIGN_OR_RAISE(auto batch, batch_it.Next()); - if (IsIterationEnd(batch)) break; - if (offset == original_indices->length()) break; - DCHECK_LT(offset, original_indices->length()); - - int64_t length = 0; - while (offset + length < original_indices->length()) { - auto rel_index = raw_indices[offset + length] - row_begin; - if (rel_index >= batch.record_batch->num_rows()) break; - ++length; - } - DCHECK_LE(offset + length, original_indices->length()); - if (length == 0) { - row_begin += batch.record_batch->num_rows(); - continue; - } - - Datum rel_indices = original_indices->Slice(offset, length); - ARROW_ASSIGN_OR_RAISE(rel_indices, - compute::Subtract(rel_indices, Datum(row_begin), - compute::ArithmeticOptions(), &ctx)); - - ARROW_ASSIGN_OR_RAISE(Datum out_batch, - compute::Take(batch.record_batch, rel_indices, - compute::TakeOptions::Defaults(), &ctx)); - out_batches.push_back(out_batch.record_batch()); - - offset += length; - row_begin += batch.record_batch->num_rows(); - } - - if (offset < original_indices->length()) { - std::stringstream error; - const int64_t max_values_shown = 3; - const int64_t num_remaining = original_indices->length() - offset; - for (int64_t i = 0; i < std::min(max_values_shown, num_remaining); i++) { - if (i > 0) error << ", "; - error << static_cast(original_indices)->Value(offset + i); - } - if (num_remaining > max_values_shown) error << ", ..."; - return Status::IndexError("Some indices were out of bounds: ", error.str()); - } - ARROW_ASSIGN_OR_RAISE(Datum out, Table::FromRecordBatches(options()->projected_schema, - std::move(out_batches))); - ARROW_ASSIGN_OR_RAISE( - out, compute::Take(out, unsort_indices, compute::TakeOptions::Defaults(), &ctx)); - return out.table(); -} - -Result> Scanner::Head(int64_t num_rows) { - if (num_rows == 0) { - return Table::FromRecordBatches(options()->projected_schema, {}); - } - ARROW_ASSIGN_OR_RAISE(auto batch_iterator, ScanBatches()); - RecordBatchVector batches; - while (true) { - ARROW_ASSIGN_OR_RAISE(auto batch, batch_iterator.Next()); - if (IsIterationEnd(batch)) break; - batches.push_back(batch.record_batch->Slice(0, num_rows)); - num_rows -= batch.record_batch->num_rows(); - if (num_rows <= 0) break; - } - return Table::FromRecordBatches(options()->projected_schema, batches); + return std::make_shared(dataset_, scan_options_); } namespace { @@ -1148,10 +769,6 @@ Result MakeScanNode(compute::ExecPlan* plan, const auto& backpressure_toggle = scan_node_options.backpressure_toggle; bool require_sequenced_output = scan_node_options.require_sequenced_output; - if (!scan_options->use_async) { - return Status::NotImplemented("ScanNodes without asynchrony"); - } - if (scan_options->dataset_schema == nullptr) { scan_options->dataset_schema = dataset->schema(); } diff --git a/cpp/src/arrow/dataset/scanner.h b/cpp/src/arrow/dataset/scanner.h index 75e9806fb8f1d..738915a812969 100644 --- a/cpp/src/arrow/dataset/scanner.h +++ b/cpp/src/arrow/dataset/scanner.h @@ -115,11 +115,6 @@ struct ARROW_DS_EXPORT ScanOptions { /// Note: This must be true in order for any readahead to happen bool use_threads = false; - /// If true then an asycnhronous implementation of the scanner will be used. - /// This implementation is newer and generally performs better. However, it - /// makes extensive use of threading and is still considered experimental - bool use_async = false; - /// Fragment-specific scan options. std::shared_ptr fragment_scan_options; @@ -138,41 +133,46 @@ struct ARROW_DS_EXPORT ScanOptions { // This is used by Fragment implementations to apply the column // sub-selection optimization. std::vector MaterializedFields() const; - - // Return a threaded or serial TaskGroup according to use_threads. - std::shared_ptr<::arrow::internal::TaskGroup> TaskGroup() const; }; -/// \brief Read record batches from a range of a single data fragment. A -/// ScanTask is meant to be a unit of work to be dispatched. The implementation -/// must be thread and concurrent safe. -class ARROW_DS_EXPORT ScanTask { - public: - /// \brief Iterate through sequence of materialized record batches - /// resulting from the Scan. Execution semantics are encapsulated in the - /// particular ScanTask implementation - virtual Result Execute() = 0; - virtual Future SafeExecute(::arrow::internal::Executor* executor); - virtual Future<> SafeVisit(::arrow::internal::Executor* executor, - std::function)> visitor); +/// \brief Describes a projection +struct ARROW_DS_EXPORT ProjectionDescr { + /// \brief The projection expression itself + /// This expression must be a call to make_struct + compute::Expression expression; + /// \brief The output schema of the projection. - virtual ~ScanTask() = default; + /// This can be calculated from the input schema and the expression but it + /// is cached here for convenience. + std::shared_ptr schema; - const std::shared_ptr& options() const { return options_; } - const std::shared_ptr& fragment() const { return fragment_; } + /// \brief Create a ProjectionDescr by binding an expression to the dataset schema + /// + /// expression must return a struct type + static Result FromStructExpression( + const compute::Expression& expression, const Schema& dataset_schema); - protected: - ScanTask(std::shared_ptr options, std::shared_ptr fragment) - : options_(std::move(options)), fragment_(std::move(fragment)) {} + /// \brief Create a ProjectionDescr from expressions/names for each field + static Result FromExpressions(std::vector exprs, + std::vector names, + const Schema& dataset_schema); - std::shared_ptr options_; - std::shared_ptr fragment_; + /// \brief Create a default projection referencing fields in the dataset schema + static Result FromNames(std::vector names, + const Schema& dataset_schema); + + /// \brief Make a projection that projects every field in the dataset schema + static Result Default(const Schema& dataset_schema); }; +/// \brief Utility method to set the projection expression and schema +ARROW_DS_EXPORT void SetProjection(ScanOptions* options, ProjectionDescr projection); + /// \brief Combines a record batch with the fragment that the record batch originated /// from /// -/// Knowing the source fragment can be useful for debugging & understanding loaded data +/// Knowing the source fragment can be useful for debugging & understanding loaded +/// data struct TaggedRecordBatch { std::shared_ptr record_batch; std::shared_ptr fragment; @@ -248,15 +248,6 @@ class ARROW_DS_EXPORT Scanner { public: virtual ~Scanner() = default; - /// \brief The Scan operator returns a stream of ScanTask. The caller is - /// responsible to dispatch/schedule said tasks. Tasks should be safe to run - /// in a concurrent fashion and outlive the iterator. - /// - /// Note: Not supported by the async scanner - /// Planned for removal from the public API in ARROW-11782. - ARROW_DEPRECATED("Deprecated in 4.0.0 for removal in 5.0.0. Use ScanBatches().") - virtual Result Scan(); - /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple threads /// are used (via use_threads), the visitor will be invoked from those threads and is /// responsible for any synchronization. @@ -283,22 +274,22 @@ class ARROW_DS_EXPORT Scanner { /// /// To make up for the out-of-order iteration each batch is further tagged with /// positional information. - virtual Result ScanBatchesUnordered(); + virtual Result ScanBatchesUnordered() = 0; virtual Result ScanBatchesUnorderedAsync() = 0; /// \brief A convenience to synchronously load the given rows by index. /// /// Will only consume as many batches as needed from ScanBatches(). - virtual Result> TakeRows(const Array& indices); + virtual Result> TakeRows(const Array& indices) = 0; /// \brief Get the first N rows. - virtual Result> Head(int64_t num_rows); + virtual Result> Head(int64_t num_rows) = 0; /// \brief Count rows matching a predicate. /// /// This method will push down the predicate and compute the result based on fragment /// metadata if possible. - virtual Result CountRows(); + virtual Result CountRows() = 0; /// \brief Convert the Scanner to a RecordBatchReader so it can be /// easily used with APIs that expect a reader. - Result> ToRecordBatchReader(); + virtual Result> ToRecordBatchReader() = 0; /// \brief Get the options for this scan. const std::shared_ptr& options() const { return scan_options_; } @@ -377,16 +368,8 @@ class ARROW_DS_EXPORT ScannerBuilder { Status UseThreads(bool use_threads = true); /// \brief Limit how many fragments the scanner will read at once - /// - /// Note: This is only enforced in "async" mode Status FragmentReadahead(int fragment_readahead); - /// \brief Indicate if the Scanner should run in experimental "async" mode - /// - /// This mode should have considerably better performance on high-latency or parallel - /// filesystems but is still experimental - Status UseAsync(bool use_async = true); - /// \brief Set the maximum number of rows per RecordBatch. /// /// \param[in] batch_size the maximum number of rows. @@ -436,21 +419,6 @@ class ARROW_DS_EXPORT ScanNodeOptions : public compute::ExecNodeOptions { /// @} -/// \brief A trivial ScanTask that yields the RecordBatch of an array. -class ARROW_DS_EXPORT InMemoryScanTask : public ScanTask { - public: - InMemoryScanTask(std::vector> record_batches, - std::shared_ptr options, - std::shared_ptr fragment) - : ScanTask(std::move(options), std::move(fragment)), - record_batches_(std::move(record_batches)) {} - - Result Execute() override; - - protected: - std::vector> record_batches_; -}; - namespace internal { ARROW_DS_EXPORT void InitializeScanner(arrow::compute::ExecFactoryRegistry* registry); } // namespace internal diff --git a/cpp/src/arrow/dataset/scanner_benchmark.cc b/cpp/src/arrow/dataset/scanner_benchmark.cc index e3021794c5d52..6d314d9d9a63a 100644 --- a/cpp/src/arrow/dataset/scanner_benchmark.cc +++ b/cpp/src/arrow/dataset/scanner_benchmark.cc @@ -119,8 +119,6 @@ void MinimalEndToEndScan(size_t num_batches, size_t batch_size, bool async_mode) std::make_shared(GetSchema(), batches); auto options = std::make_shared(); - // sync scanning is not supported by ScanNode - options->use_async = true; // specify the filter compute::Expression b_is_true = field_ref("b"); options->filter = b_is_true; diff --git a/cpp/src/arrow/dataset/scanner_internal.h b/cpp/src/arrow/dataset/scanner_internal.h deleted file mode 100644 index 2c78d1b277444..0000000000000 --- a/cpp/src/arrow/dataset/scanner_internal.h +++ /dev/null @@ -1,269 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include - -#include "arrow/array/array_nested.h" -#include "arrow/array/util.h" -#include "arrow/compute/api_scalar.h" -#include "arrow/compute/api_vector.h" -#include "arrow/compute/exec.h" -#include "arrow/dataset/dataset_internal.h" -#include "arrow/dataset/partition.h" -#include "arrow/dataset/scanner.h" -#include "arrow/util/async_generator.h" -#include "arrow/util/logging.h" - -namespace arrow { - -using internal::checked_cast; -using internal::Executor; - -namespace dataset { - -inline Result> FilterSingleBatch( - const std::shared_ptr& in, const compute::Expression& filter, - const std::shared_ptr& options) { - compute::ExecContext exec_context{options->pool}; - ARROW_ASSIGN_OR_RAISE( - Datum mask, - ExecuteScalarExpression(filter, *options->dataset_schema, in, &exec_context)); - - if (mask.is_scalar()) { - const auto& mask_scalar = mask.scalar_as(); - if (mask_scalar.is_valid && mask_scalar.value) { - return in; - } - return in->Slice(0, 0); - } - - ARROW_ASSIGN_OR_RAISE( - Datum filtered, - compute::Filter(in, mask, compute::FilterOptions::Defaults(), &exec_context)); - return filtered.record_batch(); -} - -inline RecordBatchIterator FilterRecordBatch( - RecordBatchIterator it, compute::Expression filter, - const std::shared_ptr& options) { - return MakeMaybeMapIterator( - [=](std::shared_ptr in) -> Result> { - return FilterSingleBatch(in, filter, options); - }, - std::move(it)); -} - -inline Result> ProjectSingleBatch( - const std::shared_ptr& in, const compute::Expression& projection, - const std::shared_ptr& options) { - compute::ExecContext exec_context{options->pool}; - ARROW_ASSIGN_OR_RAISE( - Datum projected, - ExecuteScalarExpression(projection, *options->dataset_schema, in, &exec_context)); - - DCHECK_EQ(projected.type()->id(), Type::STRUCT); - if (projected.shape() == ValueDescr::SCALAR) { - // Only virtual columns are projected. Broadcast to an array - ARROW_ASSIGN_OR_RAISE(projected, MakeArrayFromScalar(*projected.scalar(), - in->num_rows(), options->pool)); - } - - ARROW_ASSIGN_OR_RAISE(auto out, - RecordBatch::FromStructArray(projected.array_as())); - - return out->ReplaceSchemaMetadata(in->schema()->metadata()); -} - -inline RecordBatchIterator ProjectRecordBatch( - RecordBatchIterator it, compute::Expression projection, - const std::shared_ptr& options) { - return MakeMaybeMapIterator( - [=](std::shared_ptr in) -> Result> { - return ProjectSingleBatch(in, projection, options); - }, - std::move(it)); -} - -class FilterAndProjectScanTask : public ScanTask { - public: - explicit FilterAndProjectScanTask(std::shared_ptr task, - compute::Expression partition) - : ScanTask(task->options(), task->fragment()), - task_(std::move(task)), - partition_(std::move(partition)) {} - - Result Execute() override { - ARROW_ASSIGN_OR_RAISE(auto it, task_->Execute()); - - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, - SimplifyWithGuarantee(options()->filter, partition_)); - - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, - SimplifyWithGuarantee(options()->projection, partition_)); - - RecordBatchIterator filter_it = - FilterRecordBatch(std::move(it), simplified_filter, options_); - - return ProjectRecordBatch(std::move(filter_it), simplified_projection, options_); - } - - Result ToFilteredAndProjectedIterator( - const RecordBatchVector& rbs) { - auto it = MakeVectorIterator(rbs); - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, - SimplifyWithGuarantee(options()->filter, partition_)); - - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, - SimplifyWithGuarantee(options()->projection, partition_)); - - RecordBatchIterator filter_it = - FilterRecordBatch(std::move(it), simplified_filter, options_); - - return ProjectRecordBatch(std::move(filter_it), simplified_projection, options_); - } - - Result> FilterAndProjectBatch( - const std::shared_ptr& batch) { - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_filter, - SimplifyWithGuarantee(options()->filter, partition_)); - - ARROW_ASSIGN_OR_RAISE(compute::Expression simplified_projection, - SimplifyWithGuarantee(options()->projection, partition_)); - ARROW_ASSIGN_OR_RAISE(auto filtered, - FilterSingleBatch(batch, simplified_filter, options_)); - return ProjectSingleBatch(filtered, simplified_projection, options_); - } - - inline Future SafeExecute(Executor* executor) override { - return task_->SafeExecute(executor).Then( - // This should only be run via SerialExecutor so it should be safe to capture - // `this` - [this](const RecordBatchVector& rbs) -> Result { - ARROW_ASSIGN_OR_RAISE(auto projected_it, ToFilteredAndProjectedIterator(rbs)); - return projected_it.ToVector(); - }); - } - - inline Future<> SafeVisit( - Executor* executor, - std::function)> visitor) override { - auto filter_and_project_visitor = - [this, visitor](const std::shared_ptr& batch) { - ARROW_ASSIGN_OR_RAISE(auto projected, FilterAndProjectBatch(batch)); - return visitor(projected); - }; - return task_->SafeVisit(executor, filter_and_project_visitor); - } - - private: - std::shared_ptr task_; - compute::Expression partition_; -}; - -/// \brief GetScanTaskIterator transforms an Iterator in a -/// flattened Iterator. -inline Result GetScanTaskIterator( - FragmentIterator fragments, std::shared_ptr options) { - // Fragment -> ScanTaskIterator - auto fn = [options](std::shared_ptr fragment) -> Result { - ARROW_ASSIGN_OR_RAISE(auto scan_task_it, fragment->Scan(options)); - - // Skip applying compute on fragments if disabled. - if (!fragment->apply_compute) { - return std::move(scan_task_it); - } - - auto partition = fragment->partition_expression(); - // Apply the filter and/or projection to incoming RecordBatches by - // wrapping the ScanTask with a FilterAndProjectScanTask - auto wrap_scan_task = - [partition](std::shared_ptr task) -> std::shared_ptr { - return std::make_shared(std::move(task), partition); - }; - - return MakeMapIterator(wrap_scan_task, std::move(scan_task_it)); - }; - - // Iterator> - auto maybe_scantask_it = MakeMaybeMapIterator(fn, std::move(fragments)); - - // Iterator - return MakeFlattenIterator(std::move(maybe_scantask_it)); -} - -inline Status NestedFieldRefsNotImplemented() { - // TODO(ARROW-11259) Several functions (for example, IpcScanTask::Make) assume that - // only top level fields will be materialized. - return Status::NotImplemented("Nested field references in scans."); -} - -inline Status SetProjection(ScanOptions* options, const compute::Expression& projection) { - ARROW_ASSIGN_OR_RAISE(options->projection, projection.Bind(*options->dataset_schema)); - - if (options->projection.type()->id() != Type::STRUCT) { - return Status::Invalid("Projection ", projection.ToString(), - " cannot yield record batches"); - } - options->projected_schema = ::arrow::schema( - checked_cast(*options->projection.type()).fields(), - options->dataset_schema->metadata()); - - return Status::OK(); -} - -inline Status SetProjection(ScanOptions* options, std::vector exprs, - std::vector names) { - compute::MakeStructOptions project_options{std::move(names)}; - - for (size_t i = 0; i < exprs.size(); ++i) { - if (auto ref = exprs[i].field_ref()) { - if (!ref->name()) return NestedFieldRefsNotImplemented(); - - // set metadata and nullability for plain field references - ARROW_ASSIGN_OR_RAISE(auto field, ref->GetOne(*options->dataset_schema)); - project_options.field_nullability[i] = field->nullable(); - project_options.field_metadata[i] = field->metadata(); - } - } - - return SetProjection(options, - call("make_struct", std::move(exprs), std::move(project_options))); -} - -inline Status SetProjection(ScanOptions* options, std::vector names) { - std::vector exprs(names.size()); - for (size_t i = 0; i < exprs.size(); ++i) { - exprs[i] = compute::field_ref(names[i]); - } - return SetProjection(options, std::move(exprs), std::move(names)); -} - -inline Status SetFilter(ScanOptions* options, const compute::Expression& filter) { - for (const auto& ref : FieldsInExpression(filter)) { - if (!ref.name()) return NestedFieldRefsNotImplemented(); - - RETURN_NOT_OK(ref.FindOne(*options->dataset_schema)); - } - ARROW_ASSIGN_OR_RAISE(options->filter, filter.Bind(*options->dataset_schema)); - return Status::OK(); -} - -} // namespace dataset -} // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner_test.cc b/cpp/src/arrow/dataset/scanner_test.cc index 45e1b3688ed51..c44849c340280 100644 --- a/cpp/src/arrow/dataset/scanner_test.cc +++ b/cpp/src/arrow/dataset/scanner_test.cc @@ -28,7 +28,6 @@ #include "arrow/compute/cast.h" #include "arrow/compute/exec/exec_plan.h" #include "arrow/dataset/plan.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/dataset/test_util.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -55,7 +54,6 @@ using internal::Iota; namespace dataset { struct TestScannerParams { - bool use_async; bool use_threads; int num_child_datasets; int num_batches; @@ -64,8 +62,8 @@ struct TestScannerParams { std::string ToString() const { // GTest requires this to be alphanumeric std::stringstream ss; - ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial") - << num_child_datasets << "d" << num_batches << "b" << items_per_batch << "r"; + ss << (use_threads ? "Threaded" : "Serial") << num_child_datasets << "d" + << num_batches << "b" << items_per_batch << "r"; return ss.str(); } @@ -76,21 +74,16 @@ struct TestScannerParams { static std::vector Values() { std::vector values; - for (int sync = 0; sync < 2; sync++) { - for (int use_threads = 0; use_threads < 2; use_threads++) { - values.push_back( - {static_cast(sync), static_cast(use_threads), 1, 1, 1024}); - values.push_back( - {static_cast(sync), static_cast(use_threads), 2, 16, 1024}); - } + for (int use_threads = 0; use_threads < 2; use_threads++) { + values.push_back({static_cast(use_threads), 1, 1, 1024}); + values.push_back({static_cast(use_threads), 2, 16, 1024}); } return values; } }; std::ostream& operator<<(std::ostream& out, const TestScannerParams& params) { - out << (params.use_async ? "async-" : "sync-") - << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets + out << (params.use_threads ? "threaded-" : "serial-") << params.num_child_datasets << "d-" << params.num_batches << "b-" << params.items_per_batch << "i"; return out; } @@ -100,7 +93,6 @@ class TestScanner : public DatasetFixtureMixinWithParam { std::shared_ptr MakeScanner(std::shared_ptr dataset) { ScannerBuilder builder(std::move(dataset), options_); ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads)); - ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async)); EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish()); return scanner; } @@ -357,8 +349,6 @@ TEST_P(TestScanner, CountRows) { TEST_P(TestScanner, EmptyFragment) { // Regression test for ARROW-13982 - if (!GetParam().use_async) GTEST_SKIP() << "Test only applies to async scanner"; - SetSchema({field("i32", int32()), field("f64", float64())}); auto batch = ConstantArrayGenerator::Zeroes(GetParam().items_per_batch, schema_); auto empty_batch = ConstantArrayGenerator::Zeroes(0, schema_); @@ -401,9 +391,6 @@ class CountRowsOnlyFragment : public InMemoryFragment { } return Future>::MakeFinished(sum); } - Result Scan(std::shared_ptr) override { - return Status::Invalid("Don't scan me!"); - } Result ScanBatchesAsync( const std::shared_ptr&) override { return Status::Invalid("Don't scan me!"); @@ -418,12 +405,6 @@ class ScanOnlyFragment : public InMemoryFragment { compute::Expression predicate, const std::shared_ptr&) override { return Future>::MakeFinished(util::nullopt); } - Result Scan(std::shared_ptr options) override { - auto self = shared_from_this(); - ScanTaskVector tasks{ - std::make_shared(record_batches_, options, self)}; - return MakeVectorIterator(std::move(tasks)); - } Result ScanBatchesAsync( const std::shared_ptr&) override { return MakeVectorGenerator(record_batches_); @@ -440,7 +421,6 @@ TEST_P(TestScanner, CountRowsEmpty) { std::make_shared( schema_, FragmentVector{std::make_shared(batches)}), options_); - ASSERT_OK(builder.UseAsync(GetParam().use_async)); ASSERT_OK(builder.UseThreads(GetParam().use_threads)); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); ASSERT_OK_AND_EQ(batch->num_rows(), scanner->CountRows()); @@ -469,7 +449,6 @@ TEST_P(TestScanner, CountRowsFailure) { ScannerBuilder builder( std::make_shared(schema_, FragmentVector{fragment1, fragment2}), options_); - ASSERT_OK(builder.UseAsync(GetParam().use_async)); ASSERT_OK(builder.UseThreads(GetParam().use_threads)); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); fragment1->count.MarkFinished(Status::Invalid("")); @@ -488,7 +467,6 @@ TEST_P(TestScanner, CountRowsWithMetadata) { std::make_shared( schema_, FragmentVector{std::make_shared(batches)}), options_); - ASSERT_OK(builder.UseAsync(GetParam().use_async)); ASSERT_OK(builder.UseThreads(GetParam().use_threads)); ASSERT_OK_AND_ASSIGN(auto scanner, builder.Finish()); ASSERT_OK_AND_EQ(4 * batch->num_rows(), scanner->CountRows()); @@ -520,18 +498,6 @@ TEST_P(TestScanner, ToRecordBatchReader) { class FailingFragment : public InMemoryFragment { public: using InMemoryFragment::InMemoryFragment; - Result Scan(std::shared_ptr options) override { - int index = 0; - auto self = shared_from_this(); - return MakeFunctionIterator([=]() mutable -> Result> { - if (index > 16) { - return Status::Invalid("Oh no, we failed!"); - } - RecordBatchVector batches = {record_batches_[index++ % record_batches_.size()]}; - return std::make_shared(batches, options, self); - }); - } - Result ScanBatchesAsync( const std::shared_ptr& options) override { struct { @@ -550,48 +516,12 @@ class FailingFragment : public InMemoryFragment { } }; -class FailingExecuteScanTask : public InMemoryScanTask { - public: - using InMemoryScanTask::InMemoryScanTask; - - Result Execute() override { - return Status::Invalid("Oh no, we failed!"); - } -}; - -class FailingIterationScanTask : public InMemoryScanTask { - public: - using InMemoryScanTask::InMemoryScanTask; - - Result Execute() override { - int index = 0; - auto batches = record_batches_; - return MakeFunctionIterator( - [index, batches]() mutable -> Result> { - if (index < 1) { - return batches[index++]; - } - return Status::Invalid("Oh no, we failed!"); - }); - } -}; - -template -class FailingScanTaskFragment : public InMemoryFragment { +class FailingScanFragment : public InMemoryFragment { public: using InMemoryFragment::InMemoryFragment; - Result Scan(std::shared_ptr options) override { - auto self = shared_from_this(); - ScanTaskVector scan_tasks; - for (int i = 0; i < 4; i++) { - scan_tasks.push_back(std::make_shared(record_batches_, options, self)); - } - return MakeVectorIterator(std::move(scan_tasks)); - } - // Unlike the sync case, there's only two places to fail - during - // iteration (covered by FailingFragment) or at the initial scan - // (covered here) + // There are two places to fail - during iteration (covered by FailingFragment) or at + // the initial scan (covered here) Result ScanBatchesAsync( const std::shared_ptr& options) override { return Status::Invalid("Oh no, we failed!"); @@ -650,8 +580,7 @@ TEST_P(TestScanner, ScanBatchesFailure) { // Case 2: failure when calling ScanTask::Execute { - FragmentVector fragments{ - std::make_shared>(batches)}; + FragmentVector fragments{std::make_shared(batches)}; auto dataset = std::make_shared(schema_, fragments); auto scanner = MakeScanner(std::move(dataset)); check_scanner(*batch, scanner.get()); @@ -659,8 +588,7 @@ TEST_P(TestScanner, ScanBatchesFailure) { // Case 3: failure when calling RecordBatchIterator::Next { - FragmentVector fragments{ - std::make_shared>(batches)}; + FragmentVector fragments{std::make_shared(batches)}; auto dataset = std::make_shared(schema_, fragments); auto scanner = MakeScanner(std::move(dataset)); check_scanner(*batch, scanner.get()); @@ -707,9 +635,6 @@ TEST_P(TestScanner, Head) { } TEST_P(TestScanner, FromReader) { - if (GetParam().use_async) { - GTEST_SKIP() << "Async scanner does not support construction from reader"; - } auto batch_size = GetParam().items_per_batch; auto num_batches = GetParam().num_batches; @@ -747,10 +672,6 @@ class ControlledFragment : public Fragment { record_batch_generator_(), tracking_generator_(record_batch_generator_) {} - Result Scan(std::shared_ptr options) override { - return Status::NotImplemented( - "Not needed for testing. Sync can only return things in-order."); - } Result> ReadPhysicalSchemaImpl() override { return physical_schema_; } @@ -927,8 +848,6 @@ class TestReordering : public ::testing::Test { std::shared_ptr MakeScanner(int fragment_readahead = 0) { ScannerBuilder builder(dataset_); - // Reordering tests only make sense for async - ARROW_EXPECT_OK(builder.UseAsync(true)); if (fragment_readahead != 0) { ARROW_EXPECT_OK(builder.FragmentReadahead(fragment_readahead)); } @@ -1014,7 +933,6 @@ class TestBackpressure : public ::testing::Test { std::shared_ptr options = std::make_shared(); ScannerBuilder builder(std::move(dataset), options); ARROW_EXPECT_OK(builder.UseThreads(true)); - ARROW_EXPECT_OK(builder.UseAsync(true)); ARROW_EXPECT_OK(builder.FragmentReadahead(4)); EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish()); return scanner; @@ -1208,9 +1126,15 @@ TEST(ScanOptions, TestMaterializedFields) { auto i64 = field("i64", int64()); auto opts = std::make_shared(); + auto set_projection_from_names = [&opts](std::vector names) { + ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames( + std::move(names), *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); + }; + // empty dataset, project nothing = nothing materialized opts->dataset_schema = schema({}); - ASSERT_OK(SetProjection(opts.get(), {}, {})); + set_projection_from_names({}); EXPECT_THAT(opts->MaterializedFields(), IsEmpty()); // non-empty dataset, project nothing = nothing materialized @@ -1223,17 +1147,20 @@ TEST(ScanOptions, TestMaterializedFields) { // project i32 & i64, filter nothing = materialize i32 & i64 opts->filter = literal(true); - ASSERT_OK(SetProjection(opts.get(), {"i32", "i64"})); + set_projection_from_names({"i32", "i64"}); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64")); // project i32 + i64, filter nothing = materialize i32 & i64 opts->filter = literal(true); - ASSERT_OK(SetProjection(opts.get(), {call("add", {field_ref("i32"), field_ref("i64")})}, - {"i32 + i64"})); + ASSERT_OK_AND_ASSIGN(auto projection, + ProjectionDescr::FromExpressions( + {call("add", {field_ref("i32"), field_ref("i64")})}, + {"i32 + i64"}, *opts->dataset_schema)); + SetProjection(opts.get(), std::move(projection)); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32", "i64")); // project i32, filter nothing = materialize i32 - ASSERT_OK(SetProjection(opts.get(), {"i32"})); + set_projection_from_names({"i32"}); EXPECT_THAT(opts->MaterializedFields(), ElementsAre("i32")); // project i32, filter on i32 = materialize i32 (reported twice) @@ -1250,7 +1177,6 @@ TEST(ScanOptions, TestMaterializedFields) { } namespace { - struct TestPlan { explicit TestPlan(compute::ExecContext* ctx = compute::default_exec_context()) : plan(compute::ExecPlan::Make(ctx).ValueOrDie()) { @@ -1335,7 +1261,8 @@ DatasetAndBatches MakeBasicDataset() { // ... and with the last-in-fragment flag batches.back().values.emplace_back(batch_index == 1); - // each batch carries a guarantee inherited from its Fragment's partition expression + // each batch carries a guarantee inherited from its Fragment's partition + // expression batches.back().guarantee = equal(field_ref("c"), literal(fragment_index == 0 ? 23 : 47)); } @@ -1367,7 +1294,6 @@ TEST(ScanNode, Schema) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; options->projection = Materialize({}); // set an empty projection ASSERT_OK_AND_ASSIGN(auto scan, @@ -1378,8 +1304,8 @@ TEST(ScanNode, Schema) { fields.push_back(field("__fragment_index", int32())); fields.push_back(field("__batch_index", int32())); fields.push_back(field("__last_in_fragment", boolean())); - // output_schema is *always* the full augmented dataset schema, regardless of projection - // (but some columns *may* be placeholder null Scalars if not projected) + // output_schema is *always* the full augmented dataset schema, regardless of + // projection (but some columns *may* be placeholder null Scalars if not projected) AssertSchemaEqual(Schema(fields), *scan->output_schema()); } @@ -1389,7 +1315,6 @@ TEST(ScanNode, Trivial) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); @@ -1411,7 +1336,6 @@ TEST(ScanNode, FilteredOnVirtualColumn) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; options->filter = less(field_ref("c"), literal(30)); // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); @@ -1438,7 +1362,6 @@ TEST(ScanNode, DeferredFilterOnPhysicalColumn) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; options->filter = greater(field_ref("a"), literal(4)); // ensure all fields are materialized options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); @@ -1464,7 +1387,6 @@ TEST(ScanNode, DISABLED_ProjectionPushdown) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; options->projection = Materialize({"b"}, /*include_aug_fields=*/true); ASSERT_OK(compute::Declaration::Sequence( @@ -1493,7 +1415,6 @@ TEST(ScanNode, MaterializationOfVirtualColumn) { auto basic = MakeBasicDataset(); auto options = std::make_shared(); - options->use_async = true; options->projection = Materialize({"a", "b", "c"}, /*include_aug_fields=*/true); ASSERT_OK(compute::Declaration::Sequence( @@ -1529,7 +1450,8 @@ TEST(ScanNode, MinimalEndToEnd) { // A ScanNode is constructed from an ExecPlan (into which it is inserted), // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for - // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, compute::ExecPlan::Make(&exec_context)); @@ -1548,8 +1470,6 @@ TEST(ScanNode, MinimalEndToEnd) { })); auto options = std::make_shared(); - // sync scanning is not supported by ScanNode - options->use_async = true; // specify the filter compute::Expression b_is_true = field_ref("b"); options->filter = b_is_true; @@ -1625,7 +1545,8 @@ TEST(ScanNode, MinimalScalarAggEndToEnd) { // A ScanNode is constructed from an ExecPlan (into which it is inserted), // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for - // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, compute::ExecPlan::Make(&exec_context)); @@ -1644,8 +1565,6 @@ TEST(ScanNode, MinimalScalarAggEndToEnd) { })); auto options = std::make_shared(); - // sync scanning is not supported by ScanNode - options->use_async = true; // specify the filter compute::Expression b_is_true = field_ref("b"); options->filter = b_is_true; @@ -1720,7 +1639,8 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { // A ScanNode is constructed from an ExecPlan (into which it is inserted), // a Dataset (whose batches will be scanned), and ScanOptions (to specify a filter for - // predicate pushdown, a projection to skip materialization of unnecessary columns, ...) + // predicate pushdown, a projection to skip materialization of unnecessary columns, + // ...) ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, compute::ExecPlan::Make(&exec_context)); @@ -1739,8 +1659,6 @@ TEST(ScanNode, MinimalGroupedAggEndToEnd) { })); auto options = std::make_shared(); - // sync scanning is not supported by ScanNode - options->use_async = true; // specify the filter compute::Expression b_is_true = field_ref("b"); options->filter = b_is_true; diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index 722046e5effb6..7423abed6651c 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -31,11 +31,11 @@ #include #include +#include "arrow/array.h" #include "arrow/compute/exec/expression.h" #include "arrow/dataset/dataset_internal.h" #include "arrow/dataset/discovery.h" #include "arrow/dataset/file_base.h" -#include "arrow/dataset/scanner_internal.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/mockfs.h" #include "arrow/filesystem/path_util.h" @@ -129,16 +129,16 @@ class DatasetFixtureMixin : public ::testing::Test { public: /// \brief Ensure that record batches found in reader are equals to the /// record batches yielded by the data fragment. - void AssertScanTaskEquals(RecordBatchReader* expected, ScanTask* task, + void AssertScanTaskEquals(RecordBatchReader* expected, RecordBatchGenerator batch_gen, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, task->Execute()); - ARROW_EXPECT_OK(it.Visit([expected](std::shared_ptr rhs) -> Status { - std::shared_ptr lhs; - RETURN_NOT_OK(expected->ReadNext(&lhs)); - EXPECT_NE(lhs, nullptr); - AssertBatchesEqual(*lhs, *rhs); - return Status::OK(); - })); + ASSERT_FINISHES_OK(VisitAsyncGenerator( + batch_gen, [expected](std::shared_ptr rhs) -> Status { + std::shared_ptr lhs; + RETURN_NOT_OK(expected->ReadNext(&lhs)); + EXPECT_NE(lhs, nullptr); + AssertBatchesEqual(*lhs, *rhs); + return Status::OK(); + })); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); @@ -157,12 +157,8 @@ class DatasetFixtureMixin : public ::testing::Test { /// record batches yielded by the data fragment. void AssertFragmentEquals(RecordBatchReader* expected, Fragment* fragment, bool ensure_drained = true) { - ASSERT_OK_AND_ASSIGN(auto it, fragment->Scan(options_)); - - ARROW_EXPECT_OK(it.Visit([&](std::shared_ptr task) -> Status { - AssertScanTaskEquals(expected, task.get(), false); - return Status::OK(); - })); + ASSERT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(options_)); + AssertScanTaskEquals(expected, batch_gen); if (ensure_drained) { EnsureRecordBatchReaderDrained(expected); @@ -290,7 +286,9 @@ class DatasetFixtureMixin : public ::testing::Test { schema_ = schema(std::move(fields)); options_ = std::make_shared(); options_->dataset_schema = schema_; - ASSERT_OK(SetProjection(options_.get(), schema_->field_names())); + ASSERT_OK_AND_ASSIGN(auto projection, + ProjectionDescr::FromNames(schema_->field_names(), *schema_)); + SetProjection(options_.get(), std::move(projection)); SetFilter(literal(true)); } @@ -299,7 +297,10 @@ class DatasetFixtureMixin : public ::testing::Test { } void SetProjectedColumns(std::vector column_names) { - ASSERT_OK(SetProjection(options_.get(), std::move(column_names))); + ASSERT_OK_AND_ASSIGN( + auto projection, + ProjectionDescr::FromNames(std::move(column_names), *options_->dataset_schema)); + SetProjection(options_.get(), std::move(projection)); } std::shared_ptr schema_; @@ -311,7 +312,6 @@ class DatasetFixtureMixinWithParam : public DatasetFixtureMixin, public ::testing::WithParamInterface

{}; struct TestFormatParams { - bool use_async; bool use_threads; int num_batches; int items_per_batch; @@ -321,8 +321,8 @@ struct TestFormatParams { std::string ToString() const { // GTest requires this to be alphanumeric std::stringstream ss; - ss << (use_async ? "Async" : "Sync") << (use_threads ? "Threaded" : "Serial") - << num_batches << "b" << items_per_batch << "r"; + ss << (use_threads ? "Threaded" : "Serial") << num_batches << "b" << items_per_batch + << "r"; return ss.str(); } @@ -333,10 +333,8 @@ struct TestFormatParams { static std::vector Values() { std::vector values; - for (const bool async : std::vector{true, false}) { - for (const bool use_threads : std::vector{true, false}) { - values.push_back(TestFormatParams{async, use_threads, 16, 1024}); - } + for (const bool use_threads : std::vector{true, false}) { + values.push_back(TestFormatParams{use_threads, 16, 1024}); } return values; } @@ -397,7 +395,9 @@ class FileFormatFixtureMixin : public ::testing::Test { void SetSchema(std::vector> fields) { opts_->dataset_schema = schema(std::move(fields)); - ASSERT_OK(SetProjection(opts_.get(), opts_->dataset_schema->field_names())); + ASSERT_OK_AND_ASSIGN(auto projection, + ProjectionDescr::Default(*opts_->dataset_schema)); + SetProjection(opts_.get(), std::move(projection)); } void SetFilter(compute::Expression filter) { @@ -405,7 +405,9 @@ class FileFormatFixtureMixin : public ::testing::Test { } void Project(std::vector names) { - ASSERT_OK(SetProjection(opts_.get(), std::move(names))); + ASSERT_OK_AND_ASSIGN(auto projection, ProjectionDescr::FromNames( + std::move(names), *opts_->dataset_schema)); + SetProjection(opts_.get(), std::move(projection)); } // Shared test cases @@ -552,7 +554,6 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, auto dataset = std::make_shared(opts_->dataset_schema, FragmentVector{fragment}); ScannerBuilder builder(dataset, opts_); - ARROW_EXPECT_OK(builder.UseAsync(GetParam().use_async)); ARROW_EXPECT_OK(builder.UseThreads(GetParam().use_threads)); EXPECT_OK_AND_ASSIGN(auto scanner, builder.Finish()); EXPECT_OK_AND_ASSIGN(auto batch_it, scanner->ScanBatches()); @@ -563,15 +564,9 @@ class FileFormatScanMixin : public FileFormatFixtureMixin, // Scan the fragment directly, without using the scanner. RecordBatchIterator PhysicalBatches(std::shared_ptr fragment) { opts_->use_threads = GetParam().use_threads; - if (GetParam().use_async) { - EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); - auto batch_it = MakeGeneratorIterator(std::move(batch_gen)); - return batch_it; - } - EXPECT_OK_AND_ASSIGN(auto scan_task_it, fragment->Scan(opts_)); - return MakeFlattenIterator(MakeMaybeMapIterator( - [](std::shared_ptr scan_task) { return scan_task->Execute(); }, - std::move(scan_task_it))); + EXPECT_OK_AND_ASSIGN(auto batch_gen, fragment->ScanBatchesAsync(opts_)); + auto batch_it = MakeGeneratorIterator(std::move(batch_gen)); + return batch_it; } // Shared test cases @@ -733,11 +728,11 @@ class DummyFileFormat : public FileFormat { return schema_; } - /// \brief Open a file for scanning (always returns an empty iterator) - Result ScanFile( + /// \brief Open a file for scanning (always returns an empty generator) + Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const override { - return MakeEmptyIterator>(); + return MakeEmptyGenerator>(); } Result> MakeWriter( @@ -774,8 +769,7 @@ class JSONRecordBatchFileFormat : public FileFormat { return resolver_(source); } - /// \brief Open a file for scanning - Result ScanFile( + Result ScanBatchesAsync( const std::shared_ptr& options, const std::shared_ptr& fragment) const override { ARROW_ASSIGN_OR_RAISE(auto file, fragment->source().Open()); @@ -784,8 +778,7 @@ class JSONRecordBatchFileFormat : public FileFormat { ARROW_ASSIGN_OR_RAISE(auto schema, Inspect(fragment->source())); RecordBatchVector batches{RecordBatchFromJSON(schema, util::string_view{*buffer})}; - return std::make_shared(std::move(schema), std::move(batches)) - ->Scan(std::move(options)); + return MakeVectorGenerator(std::move(batches)); } Result> MakeWriter( @@ -1069,7 +1062,10 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { scan_options_ = std::make_shared(); scan_options_->dataset_schema = dataset_->schema(); - ASSERT_OK(SetProjection(scan_options_.get(), source_schema_->field_names())); + ASSERT_OK_AND_ASSIGN( + auto projection, + ProjectionDescr::FromNames(source_schema_->field_names(), *dataset_->schema())); + SetProjection(scan_options_.get(), std::move(projection)); } void SetWriteOptions(std::shared_ptr file_write_options) { @@ -1086,7 +1082,6 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { void DoWrite(std::shared_ptr desired_partitioning) { write_options_.partitioning = desired_partitioning; auto scanner_builder = ScannerBuilder(dataset_, scan_options_); - ASSERT_OK(scanner_builder.UseAsync(true)); ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder.Finish()); ASSERT_OK(FileSystemDataset::Write(write_options_, scanner)); @@ -1275,7 +1270,9 @@ class WriteFileSystemDatasetMixin : public MakeFileSystemDatasetMixin { compute::SortIndices(batch->GetColumnByName("sales"), compute::SortOptions({compute::SortKey{"sales"}}))); ASSERT_OK_AND_ASSIGN(Datum sorted_batch, compute::Take(batch, sort_indices)); - ASSERT_OK_AND_ASSIGN(actual_struct, sorted_batch.record_batch()->ToStructArray()); + ASSERT_OK_AND_ASSIGN(auto struct_array, + sorted_batch.record_batch()->ToStructArray()); + actual_struct = std::dynamic_pointer_cast(struct_array); } auto expected_struct = ArrayFromJSON(struct_(expected_physical_schema_->fields()), diff --git a/cpp/src/jni/dataset/jni_wrapper.cc b/cpp/src/jni/dataset/jni_wrapper.cc index 558f0880b52bc..041542804ce86 100644 --- a/cpp/src/jni/dataset/jni_wrapper.cc +++ b/cpp/src/jni/dataset/jni_wrapper.cc @@ -18,6 +18,7 @@ #include #include "arrow/array.h" +#include "arrow/array/concatenate.h" #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" @@ -476,7 +477,18 @@ JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecor std::vector> buffers; for (int i = 0; i < schema->num_fields(); ++i) { + // TODO: If the array has an offset then we need to de-offset the array + // in order for it to be properly consumed on the Java end. + // This forces a copy, it would be nice to avoid this if Java + // could consume offset-arrays. Perhaps at some point in the future + // using the C data interface. See ARROW-15275 + // + // Generally a non-zero offset will occur whenever the scanner batch + // size is smaller than the batch size of the underlying files. auto column = record_batch->column(i); + if (column->offset() != 0) { + column = JniGetOrThrow(arrow::Concatenate({column})); + } auto dataArray = column->data(); jobject field = env->NewObject(record_batch_handle_field_class, record_batch_handle_field_constructor, diff --git a/docs/source/python/dataset.rst b/docs/source/python/dataset.rst index 5af24a0b08753..8507807e83ad6 100644 --- a/docs/source/python/dataset.rst +++ b/docs/source/python/dataset.rst @@ -636,7 +636,7 @@ into memory: # other method that yields record batches. In addition, you can pass a dataset # into write_dataset directly but this method is useful if you want to customize # the scanner (e.g. to filter the input dataset or set a maximum batch size) - scanner = input_dataset.scanner(use_async=True) + scanner = input_dataset.scanner() ds.write_dataset(scanner, "repartitioned_dataset", format="parquet", partitioning=new_part) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index cca6f1b940135..4c50aabf549a8 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -446,11 +446,10 @@ cdef class Dataset(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False - If enabled, an async scanner will be used that should offer - better performance with high-latency/highly-parallel filesystems - (e.g. S3) - + use_async : bool, default True + This flag is deprecated and is being kept for this release for + backwards compatibility. It will be removed in the next + release. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -2162,8 +2161,7 @@ _DEFAULT_BATCH_SIZE = 2**20 cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, - bint use_threads=True, bint use_async=False, - MemoryPool memory_pool=None, + bint use_threads=True, MemoryPool memory_pool=None, FragmentScanOptions fragment_scan_options=None)\ except *: cdef: @@ -2198,7 +2196,6 @@ cdef void _populate_builder(const shared_ptr[CScannerBuilder]& ptr, check_status(builder.BatchSize(batch_size)) check_status(builder.UseThreads(use_threads)) - check_status(builder.UseAsync(use_async)) if memory_pool: check_status(builder.Pool(maybe_unbox_memory_pool(memory_pool))) if fragment_scan_options: @@ -2239,10 +2236,9 @@ cdef class Scanner(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False - If enabled, an async scanner will be used that should offer - better performance with high-latency/highly-parallel filesystems - (e.g. S3) + use_async : bool, default True + This flag is deprecated and is being kept for this release for + backwards compatibility. It will be removed in the next release. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -2270,7 +2266,7 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_dataset(Dataset dataset not None, - bint use_threads=True, bint use_async=False, + bint use_threads=True, object use_async=None, MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, @@ -2292,10 +2288,10 @@ cdef class Scanner(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False - If enabled, an async scanner will be used that should offer - better performance with high-latency/highly-parallel filesystems - (e.g. S3) + use_async : bool, default N/A + This flag is deprecated and is being kept for this release for + backwards compatibility. It will be removed in the next + release. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -2307,10 +2303,15 @@ cdef class Scanner(_Weakrefable): shared_ptr[CScannerBuilder] builder shared_ptr[CScanner] scanner + if use_async is not None: + warnings.warn('The use_async flag is deprecated and has no ' + 'effect. It will be removed in the next release.', + FutureWarning) + builder = make_shared[CScannerBuilder](dataset.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - use_async=use_async, memory_pool=memory_pool, + memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) @@ -2318,7 +2319,7 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_fragment(Fragment fragment not None, Schema schema=None, - bint use_threads=True, bint use_async=False, + bint use_threads=True, object use_async=None, MemoryPool memory_pool=None, object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, @@ -2342,10 +2343,10 @@ cdef class Scanner(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False - If enabled, an async scanner will be used that should offer - better performance with high-latency/highly-parallel filesystems - (e.g. S3) + use_async : bool, default N/A + This flag is deprecated and is being kept for this release for + backwards compatibility. It will be removed in the next + release. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -2359,11 +2360,16 @@ cdef class Scanner(_Weakrefable): schema = schema or fragment.physical_schema + if use_async is not None: + warnings.warn('The use_async flag is deprecated and has no ' + 'effect. It will be removed in the next release.', + FutureWarning) + builder = make_shared[CScannerBuilder](pyarrow_unwrap_schema(schema), fragment.unwrap(), options) _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - use_async=use_async, memory_pool=memory_pool, + memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) @@ -2371,9 +2377,8 @@ cdef class Scanner(_Weakrefable): @staticmethod def from_batches(source, Schema schema=None, bint use_threads=True, - bint use_async=False, - MemoryPool memory_pool=None, object columns=None, - Expression filter=None, + object use_async=None, MemoryPool memory_pool=None, + object columns=None, Expression filter=None, int batch_size=_DEFAULT_BATCH_SIZE, FragmentScanOptions fragment_scan_options=None): """ @@ -2399,10 +2404,10 @@ cdef class Scanner(_Weakrefable): use_threads : bool, default True If enabled, then maximum parallelism will be used determined by the number of available CPU cores. - use_async : bool, default False - If enabled, an async scanner will be used that should offer - better performance with high-latency/highly-parallel filesystems - (e.g. S3) + use_async : bool, default True + This flag is deprecated and is being kept for this release for + backwards compatibility. It will be removed in the next + release. memory_pool : MemoryPool, default None For memory allocations, if required. If not specified, uses the default pool. @@ -2429,9 +2434,15 @@ cdef class Scanner(_Weakrefable): 'batches instead of the given type: ' + type(source).__name__) builder = CScannerBuilder.FromRecordBatchReader(reader.reader) + + if use_async is not None: + warnings.warn('The use_async flag is deprecated and has no ' + 'effect. It will be removed in the next release.', + FutureWarning) + _populate_builder(builder, columns=columns, filter=filter, batch_size=batch_size, use_threads=use_threads, - use_async=use_async, memory_pool=memory_pool, + memory_pool=memory_pool, fragment_scan_options=fragment_scan_options) scanner = GetResultValue(builder.get().Finish()) return Scanner.wrap(scanner) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index de0d3a74dfb0c..6e71b5a51aad9 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -1013,11 +1013,11 @@ cdef class Array(_PandasConvertible): Parameters ---------- indent : int, default 2 - How much to indent the internal items in the string to + How much to indent the internal items in the string to the right, by default ``2``. top_level_indent : int, default 0 How much to indent right the entire content of the array, - by default ``0``. + by default ``0``. window : int How many items to preview at the begin and end of the array when the arrays is bigger than the window. diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c7d472252312e..7745c017893a5 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -866,7 +866,7 @@ def file_visitor(written_file): schema = schema or data.schema data = InMemoryDataset(data, schema=schema) elif isinstance(data, pa.ipc.RecordBatchReader) or _is_iterable(data): - data = Scanner.from_batches(data, schema=schema, use_async=True) + data = Scanner.from_batches(data, schema=schema) schema = None elif not isinstance(data, (Dataset, Scanner)): raise ValueError( @@ -920,7 +920,7 @@ def file_visitor(written_file): filesystem, base_dir = _resolve_filesystem_and_path(base_dir, filesystem) if isinstance(data, Dataset): - scanner = data.scanner(use_threads=use_threads, use_async=True) + scanner = data.scanner(use_threads=use_threads) else: # scanner was passed directly by the user, in which case a schema # cannot be passed diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 5fc8b27f2ace5..2e9a1bcfa680e 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -160,7 +160,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: CStatus Project(vector[CExpression]& exprs, vector[c_string]& columns) CStatus Filter(CExpression filter) CStatus UseThreads(c_bool use_threads) - CStatus UseAsync(c_bool use_async) CStatus Pool(CMemoryPool* pool) CStatus BatchSize(int64_t batch_size) CStatus FragmentScanOptions( diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 55e4ca4725b61..1be59e04199d8 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -215,34 +215,27 @@ def dataset(mockfs): @pytest.fixture(params=[ - (True, True), - (True, False), - (False, True), - (False, False) -], ids=['threaded-async', 'threaded-sync', 'serial-async', 'serial-sync']) + (True), + (False) +], ids=['threaded', 'serial']) def dataset_reader(request): ''' Fixture which allows dataset scanning operations to be - run with/without threads and with/without async + run with/without threads ''' - use_threads, use_async = request.param + use_threads = request.param class reader: def __init__(self): self.use_threads = use_threads - self.use_async = use_async def _patch_kwargs(self, kwargs): if 'use_threads' in kwargs: raise Exception( ('Invalid use of dataset_reader, do not specify' ' use_threads')) - if 'use_async' in kwargs: - raise Exception( - 'Invalid use of dataset_reader, do not specify use_async') kwargs['use_threads'] = use_threads - kwargs['use_async'] = use_async def to_table(self, dataset, **kwargs): self._patch_kwargs(kwargs) @@ -428,6 +421,31 @@ def test_scanner(dataset, dataset_reader): assert table.num_rows == scanner.count_rows() +def test_scanner_async_deprecated(dataset): + with pytest.warns(FutureWarning): + dataset.scanner(use_async=False) + with pytest.warns(FutureWarning): + dataset.scanner(use_async=True) + with pytest.warns(FutureWarning): + dataset.to_table(use_async=False) + with pytest.warns(FutureWarning): + dataset.to_table(use_async=True) + with pytest.warns(FutureWarning): + dataset.head(1, use_async=False) + with pytest.warns(FutureWarning): + dataset.head(1, use_async=True) + with pytest.warns(FutureWarning): + ds.Scanner.from_dataset(dataset, use_async=False) + with pytest.warns(FutureWarning): + ds.Scanner.from_dataset(dataset, use_async=True) + with pytest.warns(FutureWarning): + ds.Scanner.from_fragment( + next(dataset.get_fragments()), use_async=False) + with pytest.warns(FutureWarning): + ds.Scanner.from_fragment( + next(dataset.get_fragments()), use_async=True) + + @pytest.mark.parquet def test_head(dataset, dataset_reader): result = dataset_reader.head(dataset, 0) @@ -2105,10 +2123,8 @@ def test_construct_in_memory(dataset_reader): assert pa.Table.from_batches(list(dataset.to_batches())) == table -@pytest.mark.parametrize('use_threads,use_async', - [(False, False), (False, True), - (True, False), (True, True)]) -def test_scan_iterator(use_threads, use_async): +@pytest.mark.parametrize('use_threads', [False, True]) +def test_scan_iterator(use_threads): batch = pa.RecordBatch.from_arrays([pa.array(range(10))], names=["a"]) table = pa.Table.from_batches([batch]) # When constructed from readers/iterators, should be one-shot @@ -2120,8 +2136,7 @@ def test_scan_iterator(use_threads, use_async): ): # Scanning the fragment consumes the underlying iterator scanner = ds.Scanner.from_batches( - factory(), schema=schema, use_threads=use_threads, - use_async=use_async) + factory(), schema=schema, use_threads=use_threads) assert scanner.to_table() == table with pytest.raises(pa.ArrowInvalid, match=match): scanner.to_table() @@ -3472,7 +3487,7 @@ def test_write_dataset_with_scanner(tempdir): dataset = ds.dataset(tempdir, format='ipc', partitioning=["b"]) with tempfile.TemporaryDirectory() as tempdir2: - ds.write_dataset(dataset.scanner(columns=["b", "c"], use_async=True), + ds.write_dataset(dataset.scanner(columns=["b", "c"]), tempdir2, format='ipc', partitioning=["b"]) load_back = ds.dataset(tempdir2, format='ipc', partitioning=["b"]) @@ -3511,8 +3526,7 @@ def counting_generator(): yield batch scanner = ds.Scanner.from_batches( - counting_generator(), schema=schema, use_threads=True, - use_async=True) + counting_generator(), schema=schema, use_threads=True) write_thread = threading.Thread( target=lambda: ds.write_dataset( @@ -3974,11 +3988,6 @@ def test_write_iterable(tempdir): def test_write_scanner(tempdir, dataset_reader): - if not dataset_reader.use_async: - pytest.skip( - ('ARROW-13338: Write dataset with scanner does not' - ' support synchronous scan')) - table = pa.table([ pa.array(range(20)), pa.array(np.random.randn(20)), pa.array(np.repeat(['a', 'b'], 10)) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 46948097388ca..ef1646b1cf43e 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -644,10 +644,6 @@ dataset___ScannerBuilder__UseThreads <- function(sb, threads) { invisible(.Call(`_arrow_dataset___ScannerBuilder__UseThreads`, sb, threads)) } -dataset___ScannerBuilder__UseAsync <- function(sb, use_async) { - invisible(.Call(`_arrow_dataset___ScannerBuilder__UseAsync`, sb, use_async)) -} - dataset___ScannerBuilder__BatchSize <- function(sb, batch_size) { invisible(.Call(`_arrow_dataset___ScannerBuilder__BatchSize`, sb, batch_size)) } @@ -688,10 +684,6 @@ dataset___Scanner__schema <- function(sc) { .Call(`_arrow_dataset___Scanner__schema`, sc) } -dataset___ScanTask__get_batches <- function(scan_task) { - .Call(`_arrow_dataset___ScanTask__get_batches`, scan_task) -} - dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions) { invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions)) } diff --git a/r/R/dataset-scan.R b/r/R/dataset-scan.R index 21a5056f7e102..dd96d7ebee2aa 100644 --- a/r/R/dataset-scan.R +++ b/r/R/dataset-scan.R @@ -33,8 +33,8 @@ #' * `filter`: A `Expression` to filter the scanned rows by, or `TRUE` (default) #' to keep all rows. #' * `use_threads`: logical: should scanning use multithreading? Default `TRUE` -#' * `use_async`: logical: should the async scanner (performs better on -#' high-latency/highly parallel filesystems like S3) be used? Default `TRUE` +#' * `use_async`: logical: deprecated, this field no longer has any effect on +#' behavior. #' * `...`: Additional arguments, currently ignored #' @section Methods: #' `ScannerBuilder` has the following methods: @@ -45,7 +45,7 @@ #' - `$UseThreads(threads)`: logical: should the scan use multithreading? #' The method's default input is `TRUE`, but you must call the method to enable #' multithreading because the scanner default is `FALSE`. -#' - `$UseAsync(use_async)`: logical: should the async scanner be used? +#' - `$UseAsync(use_async)`: logical: deprecated, has no effect #' - `$BatchSize(batch_size)`: integer: Maximum row count of scanned record #' batches, default is 32K. If scanned record batches are overflowing memory #' then this method can be called to reduce their size. @@ -73,10 +73,15 @@ Scanner$create <- function(dataset, projection = NULL, filter = TRUE, use_threads = option_use_threads(), - use_async = getOption("arrow.use_async", TRUE), + use_async = NULL, batch_size = NULL, fragment_scan_options = NULL, ...) { + if (!is.null(use_async)) { + .Deprecated(msg = paste0("The parameter 'use_async' is deprecated ", + "and will be removed in a future release.")) + } + if (inherits(dataset, "arrow_dplyr_query")) { if (is_collapsed(dataset)) { # TODO: Is there a way to get a RecordBatchReader rather than evaluating? @@ -107,7 +112,6 @@ Scanner$create <- function(dataset, proj, dataset$filtered_rows, use_threads, - use_async, batch_size, fragment_scan_options, ... @@ -118,9 +122,6 @@ Scanner$create <- function(dataset, if (use_threads) { scanner_builder$UseThreads() } - if (use_async) { - scanner_builder$UseAsync() - } if (!is.null(projection)) { scanner_builder$Project(projection) } @@ -159,13 +160,6 @@ tail.Scanner <- function(x, n = 6L, ...) { Table$create(!!!rev(result)) } -ScanTask <- R6Class("ScanTask", - inherit = ArrowObject, - public = list( - Execute = function() dataset___ScanTask__get_batches(self) - ) -) - #' Apply a function to a stream of RecordBatches #' #' As an alternative to calling `collect()` on a `Dataset` query, you can @@ -247,7 +241,8 @@ ScannerBuilder <- R6Class("ScannerBuilder", self }, UseAsync = function(use_async = TRUE) { - dataset___ScannerBuilder__UseAsync(self, use_async) + .Deprecated(msg = paste0("The function 'UseAsync' is deprecated and ", + "will be removed in a future release.")) self }, BatchSize = function(batch_size) { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 059922d2fd768..6185f045eb34e 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -123,7 +123,7 @@ write_dataset <- function(dataset, dataset <- dplyr::ungroup(dataset) } - scanner <- Scanner$create(dataset, use_async = TRUE) + scanner <- Scanner$create(dataset) if (!inherits(partitioning, "Partitioning")) { partition_schema <- scanner$schema[partitioning] if (isTRUE(hive_style)) { diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 86b74234eca94..c3bc0bde2616e 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2553,23 +2553,6 @@ extern "C" SEXP _arrow_dataset___ScannerBuilder__UseThreads(SEXP sb_sexp, SEXP t } #endif -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -void dataset___ScannerBuilder__UseAsync(const std::shared_ptr& sb, bool use_async); -extern "C" SEXP _arrow_dataset___ScannerBuilder__UseAsync(SEXP sb_sexp, SEXP use_async_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type sb(sb_sexp); - arrow::r::Input::type use_async(use_async_sexp); - dataset___ScannerBuilder__UseAsync(sb, use_async); - return R_NilValue; -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___ScannerBuilder__UseAsync(SEXP sb_sexp, SEXP use_async_sexp){ - Rf_error("Cannot call dataset___ScannerBuilder__UseAsync(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // dataset.cpp #if defined(ARROW_R_WITH_DATASET) void dataset___ScannerBuilder__BatchSize(const std::shared_ptr& sb, int64_t batch_size); @@ -2725,21 +2708,6 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ } #endif -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -cpp11::list dataset___ScanTask__get_batches(const std::shared_ptr& scan_task); -extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type scan_task(scan_task_sexp); - return cpp11::as_sexp(dataset___ScanTask__get_batches(scan_task)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___ScanTask__get_batches(SEXP scan_task_sexp){ - Rf_error("Cannot call dataset___ScanTask__get_batches(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // dataset.cpp #if defined(ARROW_R_WITH_DATASET) void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions); @@ -7438,7 +7406,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___ScannerBuilder__ProjectExprs", (DL_FUNC) &_arrow_dataset___ScannerBuilder__ProjectExprs, 3}, { "_arrow_dataset___ScannerBuilder__Filter", (DL_FUNC) &_arrow_dataset___ScannerBuilder__Filter, 2}, { "_arrow_dataset___ScannerBuilder__UseThreads", (DL_FUNC) &_arrow_dataset___ScannerBuilder__UseThreads, 2}, - { "_arrow_dataset___ScannerBuilder__UseAsync", (DL_FUNC) &_arrow_dataset___ScannerBuilder__UseAsync, 2}, { "_arrow_dataset___ScannerBuilder__BatchSize", (DL_FUNC) &_arrow_dataset___ScannerBuilder__BatchSize, 2}, { "_arrow_dataset___ScannerBuilder__FragmentScanOptions", (DL_FUNC) &_arrow_dataset___ScannerBuilder__FragmentScanOptions, 2}, { "_arrow_dataset___ScannerBuilder__schema", (DL_FUNC) &_arrow_dataset___ScannerBuilder__schema, 1}, @@ -7449,7 +7416,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, - { "_arrow_dataset___ScanTask__get_batches", (DL_FUNC) &_arrow_dataset___ScanTask__get_batches, 1}, { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 8}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 7e0235bf90197..3982af4f7f51d 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -135,7 +135,6 @@ std::shared_ptr ExecNode_Scan( // TODO: pass in FragmentScanOptions auto options = std::make_shared(); - options->use_async = true; options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); options->dataset_schema = dataset->schema(); diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index a1d24fb51bc10..3eb9acfa4a9ea 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -424,12 +424,6 @@ void dataset___ScannerBuilder__UseThreads(const std::shared_ptrUseThreads(threads)); } -// [[dataset::export]] -void dataset___ScannerBuilder__UseAsync(const std::shared_ptr& sb, - bool use_async) { - StopIfNotOk(sb->UseAsync(use_async)); -} - // [[dataset::export]] void dataset___ScannerBuilder__BatchSize(const std::shared_ptr& sb, int64_t batch_size) { @@ -497,20 +491,6 @@ std::shared_ptr dataset___Scanner__schema( return sc->options()->projected_schema; } -// [[dataset::export]] -cpp11::list dataset___ScanTask__get_batches( - const std::shared_ptr& scan_task) { - arrow::RecordBatchIterator rbi; - rbi = ValueOrStop(scan_task->Execute()); - std::vector> out; - std::shared_ptr batch; - for (auto b : rbi) { - batch = ValueOrStop(b); - out.push_back(batch); - } - return arrow::r::to_r_list(out); -} - // [[dataset::export]] void dataset___Dataset__Write( const std::shared_ptr& file_write_options, diff --git a/r/tests/testthat/test-dataset.R b/r/tests/testthat/test-dataset.R index 58e7458098e28..a0764822ea3d3 100644 --- a/r/tests/testthat/test-dataset.R +++ b/r/tests/testthat/test-dataset.R @@ -601,9 +601,21 @@ test_that("Scanner$ScanBatches", { table <- Table$create(!!!batches) expect_equal(as.data.frame(table), rbind(df1, df2)) - batches <- ds$NewScan()$UseAsync(TRUE)$Finish()$ScanBatches() + batches <- ds$NewScan()$Finish()$ScanBatches() table <- Table$create(!!!batches) expect_equal(as.data.frame(table), rbind(df1, df2)) + + expect_deprecated(ds$NewScan()$UseAsync(TRUE), paste0("The function", + " 'UseAsync' is deprecated and will be removed in a future release.")) + expect_deprecated(ds$NewScan()$UseAsync(FALSE), paste0("The function", + " 'UseAsync' is deprecated and will be removed in a future release.")) + + expect_deprecated(Scanner$create(ds, use_async=TRUE), paste0("The", + " parameter 'use_async' is deprecated and will be removed in a future", + " release.")) + expect_deprecated(Scanner$create(ds, use_async=FALSE), paste0("The", + " parameter 'use_async' is deprecated and will be removed in a future", + " release.")) }) test_that("Scanner$ToRecordBatchReader()", {