From 45b0baa1027874507bf3e8d272a79958f894a9cd Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 18 Aug 2023 19:21:23 +0000 Subject: [PATCH 1/3] add parser_kwargs to FileSourceStage --- .../include/morpheus/io/deserializers.hpp | 2 +- .../include/morpheus/stages/file_source.hpp | 9 +++++++-- morpheus/_lib/src/io/deserializers.cpp | 4 ++-- morpheus/_lib/src/stages/file_source.cpp | 20 ++++++++++++++----- morpheus/_lib/stages/__init__.pyi | 2 +- morpheus/_lib/stages/module.cpp | 3 ++- morpheus/_lib/tests/test_file_in_out.cpp | 2 +- morpheus/stages/input/file_source_stage.py | 9 +++++++-- tests/test_file_in_out.py | 19 ++++++++++++++++++ tests/tests_data/simple.json | 3 +++ 10 files changed, 58 insertions(+), 15 deletions(-) create mode 100644 tests/tests_data/simple.json diff --git a/morpheus/_lib/include/morpheus/io/deserializers.hpp b/morpheus/_lib/include/morpheus/io/deserializers.hpp index 2a845a9b5b..a4574ad0d5 100644 --- a/morpheus/_lib/include/morpheus/io/deserializers.hpp +++ b/morpheus/_lib/include/morpheus/io/deserializers.hpp @@ -48,7 +48,7 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ * @param filename : Name of the file that should be loaded into a table * @return cudf::io::table_with_metadata */ -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type = FileTypes::Auto); +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type = FileTypes::Auto, std::optional json_lines = std::nullopt); /** * @brief Returns the number of index columns in `data_table`, in practice this will be a `0` or `1` diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index 248d2112f2..dc5d6cf944 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -32,6 +32,7 @@ #include #include +#include #include #include #include // for vector @@ -63,14 +64,16 @@ class FileSourceStage : public mrc::pymrc::PythonSource json_lines = std::nullopt); private: subscriber_fn_t build(); std::string m_filename; int m_repeat{1}; + std::optional m_json_lines; }; /****** FileSourceStageInterfaceProxy***********************/ @@ -86,12 +89,14 @@ struct FileSourceStageInterfaceProxy * @param name : Name of a stage reference * @param filename : Name of the file from which the messages will be read. * @param repeat : Repeats the input dataset multiple times. Useful to extend small datasets for debugging. + * @param parser_kwargs : Optional arguments to pass to the file parser. * @return std::shared_ptr> */ static std::shared_ptr> init(mrc::segment::Builder& builder, const std::string& name, std::string filename, - int repeat = 1); + int repeat = 1, + pybind11::dict parser_kwargs = pybind11::dict()); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/src/io/deserializers.cpp b/morpheus/_lib/src/io/deserializers.cpp index cda1a25235..78b6639f20 100644 --- a/morpheus/_lib/src/io/deserializers.cpp +++ b/morpheus/_lib/src/io/deserializers.cpp @@ -53,7 +53,7 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; }); } -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type) +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type, std::optional json_lines) { if (file_type == FileTypes::Auto) { @@ -65,7 +65,7 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename, switch (file_type) { case FileTypes::JSON: { - auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(true); + auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true)); table = cudf::io::read_json(options.build()); break; } diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index 0bceb33623..ea1a16439d 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -25,6 +25,7 @@ #include "pymrc/node.hpp" #include "morpheus/io/deserializers.hpp" +#include "morpheus/objects/file_types.hpp" #include "morpheus/objects/table_info.hpp" #include "morpheus/utilities/cudf_util.hpp" @@ -37,6 +38,7 @@ #include #include +#include #include #include // IWYU thinks we need __alloc_traits<>::value_type for vector assignments @@ -45,16 +47,17 @@ namespace morpheus { // Component public implementations // ************ FileSourceStage ************* // -FileSourceStage::FileSourceStage(std::string filename, int repeat) : +FileSourceStage::FileSourceStage(std::string filename, int repeat, std::optional json_lines) : PythonSource(build()), m_filename(std::move(filename)), - m_repeat(repeat) + m_repeat(repeat), + m_json_lines(json_lines) {} FileSourceStage::subscriber_fn_t FileSourceStage::build() { return [this](rxcpp::subscriber output) { - auto data_table = load_table_from_file(m_filename); + auto data_table = load_table_from_file(m_filename, FileTypes::Auto, m_json_lines); int index_col_count = prepare_df_index(data_table); // Next, create the message metadata. This gets reused for repeats @@ -112,9 +115,16 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() // ************ FileSourceStageInterfaceProxy ************ // std::shared_ptr> FileSourceStageInterfaceProxy::init( - mrc::segment::Builder& builder, const std::string& name, std::string filename, int repeat) + mrc::segment::Builder& builder, const std::string& name, std::string filename, int repeat, pybind11::dict parser_kwargs) { - auto stage = builder.construct_object(name, filename, repeat); + std::optional json_lines = std::nullopt; + + if (parser_kwargs.contains("lines")) + { + json_lines = parser_kwargs["lines"].cast(); + } + + auto stage = builder.construct_object(name, filename, repeat, json_lines); return stage; } diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index b62fba68e4..6c0679dede 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -40,7 +40,7 @@ class DeserializeStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True) -> None: ... pass class FileSourceStage(mrc.core.segment.SegmentObject): - def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int) -> None: ... + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ... pass class FilterDetectionsStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, threshold: float, copy: bool, filter_source: morpheus._lib.common.FilterSource, field_name: str = 'probs') -> None: ... diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index caac2e9246..a173359259 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -105,7 +105,8 @@ PYBIND11_MODULE(stages, _module) py::arg("builder"), py::arg("name"), py::arg("filename"), - py::arg("repeat")); + py::arg("repeat"), + py::arg("parser_kwargs")); py::class_, mrc::segment::ObjectProperties, diff --git a/morpheus/_lib/tests/test_file_in_out.cpp b/morpheus/_lib/tests/test_file_in_out.cpp index 612d074330..6f8a89c685 100644 --- a/morpheus/_lib/tests/test_file_in_out.cpp +++ b/morpheus/_lib/tests/test_file_in_out.cpp @@ -90,7 +90,7 @@ TEST_F(TestFileInOut, RoundTripCSV) } } -TEST_F(TestFileInOut, RoundTripJSON) +TEST_F(TestFileInOut, RoundTripJSONLines) { using nlohmann::json; auto input_file = test::get_morpheus_root() / "tests/tests_data/filter_probs.jsonlines"; diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 8a44397500..7bc2879d12 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -57,6 +57,8 @@ class FileSourceStage(PreallocatorMixin, SingleOutputSource): filter_null : bool, default = True Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down the line with processing. Setting this to True is recommended. + parser_kwargs : dict, default = {} + Extra options to pass to the file parser. """ def __init__(self, @@ -65,7 +67,8 @@ def __init__(self, iterative: bool = False, file_type: FileTypes = FileTypes.Auto, repeat: int = 1, - filter_null: bool = True): + filter_null: bool = True, + parser_kwargs: dict = {}): super().__init__(c) @@ -74,6 +77,7 @@ def __init__(self, self._filename = filename self._file_type = file_type self._filter_null = filter_null + self._parser_kwargs = parser_kwargs self._input_count = None self._max_concurrent = c.num_threads @@ -101,7 +105,7 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair: if self._build_cpp_node(): import morpheus._lib.stages as _stages - out_stream = _stages.FileSourceStage(builder, self.unique_name, self._filename, self._repeat_count) + out_stream = _stages.FileSourceStage(builder, self.unique_name, self._filename, self._repeat_count, self._parser_kwargs) else: out_stream = builder.make_source(self.unique_name, self._generate_frames()) @@ -115,6 +119,7 @@ def _generate_frames(self) -> typing.Iterable[MessageMeta]: self._filename, self._file_type, filter_nulls=self._filter_null, + parser_kwargs=self._parser_kwargs, df_type="cudf", ) diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index a52549de66..c88faca867 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -31,6 +31,7 @@ from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from utils import TEST_DIRS from utils import assert_path_exists from utils.dataset_manager import DatasetManager @@ -76,6 +77,24 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: assert output_data.tolist() == validation_data.tolist() +def test_file_read_json(config): + src_file = os.path.join(TEST_DIRS.tests_data_dir, "simple.json") + + pipe = LinearPipeline(config) + pipe.set_source(FileSourceStage(config, filename=src_file, parser_kwargs={ "lines": False })) + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + pipe.run() + + messages = sink_stage.get_messages() + + assert(len(messages) == 1) + + meta = messages[0] + + assert(len(meta.df) == 4) + assert(len(meta.df.columns) == 3) + + @pytest.mark.slow @pytest.mark.use_python @pytest.mark.usefixtures("chdir_tmpdir") diff --git a/tests/tests_data/simple.json b/tests/tests_data/simple.json new file mode 100644 index 0000000000..3a492210d4 --- /dev/null +++ b/tests/tests_data/simple.json @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:29679e130a150265fbadb79367714bf5fcc4a6f845b388bae992e5a160deb2a7 +size 391 From ac0b7642448122f05db4c70ce35459f199e90598 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 18 Aug 2023 21:26:16 +0000 Subject: [PATCH 2/3] fix styles --- morpheus/_lib/include/morpheus/io/deserializers.hpp | 5 ++++- morpheus/_lib/include/morpheus/stages/file_source.hpp | 3 ++- morpheus/_lib/src/io/deserializers.cpp | 7 +++++-- morpheus/_lib/src/stages/file_source.cpp | 7 ++++++- morpheus/stages/input/file_source_stage.py | 6 +++++- tests/test_file_in_out.py | 10 +++++----- 6 files changed, 27 insertions(+), 11 deletions(-) diff --git a/morpheus/_lib/include/morpheus/io/deserializers.hpp b/morpheus/_lib/include/morpheus/io/deserializers.hpp index a4574ad0d5..05910fa857 100644 --- a/morpheus/_lib/include/morpheus/io/deserializers.hpp +++ b/morpheus/_lib/include/morpheus/io/deserializers.hpp @@ -22,6 +22,7 @@ #include #include // for pybind11::object +#include #include #include @@ -48,7 +49,9 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ * @param filename : Name of the file that should be loaded into a table * @return cudf::io::table_with_metadata */ -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type = FileTypes::Auto, std::optional json_lines = std::nullopt); +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, + FileTypes file_type = FileTypes::Auto, + std::optional json_lines = std::nullopt); /** * @brief Returns the number of index columns in `data_table`, in practice this will be a `0` or `1` diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index dc5d6cf944..4e6fb2f541 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity @@ -95,7 +96,7 @@ struct FileSourceStageInterfaceProxy static std::shared_ptr> init(mrc::segment::Builder& builder, const std::string& name, std::string filename, - int repeat = 1, + int repeat = 1, pybind11::dict parser_kwargs = pybind11::dict()); }; #pragma GCC visibility pop diff --git a/morpheus/_lib/src/io/deserializers.cpp b/morpheus/_lib/src/io/deserializers.cpp index 78b6639f20..0cd129f191 100644 --- a/morpheus/_lib/src/io/deserializers.cpp +++ b/morpheus/_lib/src/io/deserializers.cpp @@ -53,7 +53,9 @@ std::vector get_column_names_from_table(const cudf::io::table_with_ return foreach_map(table.metadata.schema_info, [](auto schema) { return schema.name; }); } -cudf::io::table_with_metadata load_table_from_file(const std::string& filename, FileTypes file_type, std::optional json_lines) +cudf::io::table_with_metadata load_table_from_file(const std::string& filename, + FileTypes file_type, + std::optional json_lines) { if (file_type == FileTypes::Auto) { @@ -65,7 +67,8 @@ cudf::io::table_with_metadata load_table_from_file(const std::string& filename, switch (file_type) { case FileTypes::JSON: { - auto options = cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true)); + auto options = + cudf::io::json_reader_options::builder(cudf::io::source_info{filename}).lines(json_lines.value_or(true)); table = cudf::io::read_json(options.build()); break; } diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index ea1a16439d..0bb9870fea 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include // for str_attr_accessor #include // for pybind11::int_ @@ -115,7 +116,11 @@ FileSourceStage::subscriber_fn_t FileSourceStage::build() // ************ FileSourceStageInterfaceProxy ************ // std::shared_ptr> FileSourceStageInterfaceProxy::init( - mrc::segment::Builder& builder, const std::string& name, std::string filename, int repeat, pybind11::dict parser_kwargs) + mrc::segment::Builder& builder, + const std::string& name, + std::string filename, + int repeat, + pybind11::dict parser_kwargs) { std::optional json_lines = std::nullopt; diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 7bc2879d12..0d64001a0a 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -105,7 +105,11 @@ def _build_source(self, builder: mrc.Builder) -> StreamPair: if self._build_cpp_node(): import morpheus._lib.stages as _stages - out_stream = _stages.FileSourceStage(builder, self.unique_name, self._filename, self._repeat_count, self._parser_kwargs) + out_stream = _stages.FileSourceStage(builder, + self.unique_name, + self._filename, + self._repeat_count, + self._parser_kwargs) else: out_stream = builder.make_source(self.unique_name, self._generate_frames()) diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index c88faca867..f9ac097924 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -28,10 +28,10 @@ from morpheus.messages import MultiMessage from morpheus.pipeline import LinearPipeline from morpheus.stages.input.file_source_stage import FileSourceStage +from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from morpheus.stages.output.write_to_file_stage import WriteToFileStage from morpheus.stages.postprocess.serialize_stage import SerializeStage from morpheus.stages.preprocess.deserialize_stage import DeserializeStage -from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage from utils import TEST_DIRS from utils import assert_path_exists from utils.dataset_manager import DatasetManager @@ -81,18 +81,18 @@ def test_file_read_json(config): src_file = os.path.join(TEST_DIRS.tests_data_dir, "simple.json") pipe = LinearPipeline(config) - pipe.set_source(FileSourceStage(config, filename=src_file, parser_kwargs={ "lines": False })) + pipe.set_source(FileSourceStage(config, filename=src_file, parser_kwargs={"lines": False})) sink_stage = pipe.add_stage(InMemorySinkStage(config)) pipe.run() messages = sink_stage.get_messages() - assert(len(messages) == 1) + assert (len(messages) == 1) meta = messages[0] - assert(len(meta.df) == 4) - assert(len(meta.df.columns) == 3) + assert (len(meta.df) == 4) + assert (len(meta.df.columns) == 3) @pytest.mark.slow From 203e1f8c43767ee93496f9936da919caa3cac790 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Sat, 19 Aug 2023 03:05:06 +0000 Subject: [PATCH 3/3] fix pylint warnings --- morpheus/stages/input/file_source_stage.py | 4 ++-- tests/test_file_in_out.py | 18 +++++++++--------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/morpheus/stages/input/file_source_stage.py b/morpheus/stages/input/file_source_stage.py index 0d64001a0a..d6c396a660 100644 --- a/morpheus/stages/input/file_source_stage.py +++ b/morpheus/stages/input/file_source_stage.py @@ -68,7 +68,7 @@ def __init__(self, file_type: FileTypes = FileTypes.Auto, repeat: int = 1, filter_null: bool = True, - parser_kwargs: dict = {}): + parser_kwargs: dict = None): super().__init__(c) @@ -77,7 +77,7 @@ def __init__(self, self._filename = filename self._file_type = file_type self._filter_null = filter_null - self._parser_kwargs = parser_kwargs + self._parser_kwargs = parser_kwargs or {} self._input_count = None self._max_concurrent = c.num_threads diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index f9ac097924..61e7d10c6d 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -43,9 +43,9 @@ @pytest.mark.parametrize("flush", [False, True], ids=["no_flush", "flush"]) @pytest.mark.parametrize("repeat", [1, 2, 5], ids=["repeat1", "repeat2", "repeat5"]) def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: int): - input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type)) + input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, repeat=repeat)) @@ -63,7 +63,7 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: # The output data will contain an additional id column that we will need to slice off output_data = np.loadtxt(out_file, delimiter=",", skiprows=1) output_data = output_data[:, 1:] - elif output_type == "json" or output_type == "jsonlines": # assume json + elif output_type in ("json", "jsonlines"): # assume json df = read_file_to_df(out_file, file_type=FileTypes.Auto) output_data = df.values elif output_type == "parquet": @@ -120,9 +120,9 @@ def test_to_file_no_path(tmp_path, config): @pytest.mark.parametrize("input_type", ["csv", "jsonlines", "parquet"]) @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_multi_segment_pipe(tmp_path, config, input_type, output_type): - input_file = os.path.join(TEST_DIRS.tests_data_dir, 'filter_probs.{}'.format(input_type)) + input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') if (input_type == "parquet"): CppConfig.set_should_use_cpp(False) @@ -182,10 +182,10 @@ def test_file_rw_index_pipe(tmp_path, config, input_file): "include_header": True }), (os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines"), {})], ids=["CSV", "CSV_ID", "JSON"]) -def test_file_roundtrip(use_cpp, tmp_path, input_file, extra_kwargs): +def test_file_roundtrip(tmp_path, input_file, extra_kwargs): # Output file should be same type as input - out_file = os.path.join(tmp_path, 'results{}'.format(os.path.splitext(input_file)[1])) + out_file = os.path.join(tmp_path, f'results{os.path.splitext(input_file)[1]}') # Read the dataframe df = read_file_to_df(input_file, df_type='cudf') @@ -223,7 +223,7 @@ def test_read_cpp_compare(input_file: str): @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False)) @@ -253,7 +253,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path, config, output_type): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") - out_file = os.path.join(tmp_path, 'results.{}'.format(output_type)) + out_file = os.path.join(tmp_path, f'results.{output_type}') pipe = LinearPipeline(config) pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))