diff --git a/kedro-airflow/RELEASE.md b/kedro-airflow/RELEASE.md index 3e9d35d3e..75e4654e6 100755 --- a/kedro-airflow/RELEASE.md +++ b/kedro-airflow/RELEASE.md @@ -1,4 +1,5 @@ # Upcoming release 0.5.2 +* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. # Release 0.5.1 * Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template. diff --git a/kedro-airflow/tests/test_plugin.py b/kedro-airflow/tests/test_plugin.py index 48d1fb7b0..77c051ff5 100644 --- a/kedro-airflow/tests/test_plugin.py +++ b/kedro-airflow/tests/test_plugin.py @@ -2,7 +2,8 @@ import pytest from kedro.framework.project import pipelines -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro_airflow.plugin import commands @@ -30,7 +31,7 @@ def test_create_airflow_dag( ): """Check the generation and validity of a simple Airflow DAG.""" dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}.py" - mock_pipeline = Pipeline( + mock_pipeline = modular_pipeline( [ node(identity, ["input"], ["intermediate"], name="node0"), node(identity, ["intermediate"], ["output"], name="node1"), diff --git a/kedro-datasets/RELEASE.md b/kedro-datasets/RELEASE.md index de80c50e5..72237defd 100755 --- a/kedro-datasets/RELEASE.md +++ b/kedro-datasets/RELEASE.md @@ -1,5 +1,6 @@ # Upcoming Release: +* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. * Relaxed PyArrow range in line with Pandas diff --git a/kedro-datasets/tests/spark/test_deltatable_dataset.py b/kedro-datasets/tests/spark/test_deltatable_dataset.py index fe1d49d37..5cbbe62b7 100644 --- a/kedro-datasets/tests/spark/test_deltatable_dataset.py +++ b/kedro-datasets/tests/spark/test_deltatable_dataset.py @@ -1,7 +1,8 @@ import pytest from delta import DeltaTable from kedro.io import DataCatalog, DataSetError -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import ParallelRunner from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StringType, StructField, StructType @@ -80,7 +81,7 @@ def no_output(x): delta_ds = DeltaTableDataSet(filepath="") catalog = DataCatalog(data_sets={"delta_in": delta_ds}) - pipeline = Pipeline([node(no_output, "delta_in", None)]) + pipeline = modular_pipeline([node(no_output, "delta_in", None)]) pattern = ( r"The following data sets cannot be used with " r"multiprocessing: \['delta_in'\]" diff --git a/kedro-datasets/tests/spark/test_spark_dataset.py b/kedro-datasets/tests/spark/test_spark_dataset.py index 4567d6fc9..d02f99bff 100644 --- a/kedro-datasets/tests/spark/test_spark_dataset.py +++ b/kedro-datasets/tests/spark/test_spark_dataset.py @@ -8,7 +8,8 @@ import pytest from kedro.io import DataCatalog, DataSetError, Version from kedro.io.core import generate_timestamp -from kedro.pipeline import Pipeline, node +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import ParallelRunner, SequentialRunner from moto import mock_s3 from pyspark.sql import SparkSession @@ -413,7 +414,7 @@ def test_exists_raises_error(self, mocker): def test_parallel_runner(self, is_async, spark_in): """Test ParallelRunner with SparkDataSet fails.""" catalog = DataCatalog(data_sets={"spark_in": spark_in}) - pipeline = Pipeline([node(identity, "spark_in", "spark_out")]) + pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")]) pattern = ( r"The following data sets cannot be used with " r"multiprocessing: \['spark_in'\]" @@ -949,7 +950,7 @@ def data_catalog(tmp_path): class TestDataFlowSequentialRunner: def test_spark_load_save(self, is_async, data_catalog): """SparkDataSet(load) -> node -> Spark (save).""" - pipeline = Pipeline([node(identity, "spark_in", "spark_out")]) + pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")]) SequentialRunner(is_async=is_async).run(pipeline, data_catalog) save_path = Path(data_catalog._data_sets["spark_out"]._filepath.as_posix()) @@ -958,7 +959,7 @@ def test_spark_load_save(self, is_async, data_catalog): def test_spark_pickle(self, is_async, data_catalog): """SparkDataSet(load) -> node -> PickleDataSet (save)""" - pipeline = Pipeline([node(identity, "spark_in", "pickle_ds")]) + pipeline = modular_pipeline([node(identity, "spark_in", "pickle_ds")]) pattern = ".* was not serialised due to.*" with pytest.raises(DataSetError, match=pattern): SequentialRunner(is_async=is_async).run(pipeline, data_catalog) @@ -966,7 +967,7 @@ def test_spark_pickle(self, is_async, data_catalog): def test_spark_memory_spark(self, is_async, data_catalog): """SparkDataSet(load) -> node -> MemoryDataSet (save and then load) -> node -> SparkDataSet (save)""" - pipeline = Pipeline( + pipeline = modular_pipeline( [ node(identity, "spark_in", "memory_ds"), node(identity, "memory_ds", "spark_out"), diff --git a/kedro-telemetry/RELEASE.md b/kedro-telemetry/RELEASE.md index 595b27147..c8892d6bf 100644 --- a/kedro-telemetry/RELEASE.md +++ b/kedro-telemetry/RELEASE.md @@ -1,4 +1,5 @@ # Upcoming Release 0.2.4 +* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory. # Release 0.2.3 diff --git a/kedro-telemetry/tests/test_plugin.py b/kedro-telemetry/tests/test_plugin.py index 0da62eb62..26ed0be6e 100644 --- a/kedro-telemetry/tests/test_plugin.py +++ b/kedro-telemetry/tests/test_plugin.py @@ -7,7 +7,8 @@ from kedro.framework.project import pipelines from kedro.framework.startup import ProjectMetadata from kedro.io import DataCatalog, MemoryDataSet -from kedro.pipeline import node, pipeline +from kedro.pipeline import node +from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from pytest import fixture from kedro_telemetry import __version__ as TELEMETRY_VERSION @@ -55,7 +56,7 @@ def identity(arg): @fixture def fake_default_pipeline(): - mock_default_pipeline = pipeline( + mock_default_pipeline = modular_pipeline( [ node(identity, ["input"], ["intermediate"], name="node0"), node(identity, ["intermediate"], ["output"], name="node1"), @@ -66,7 +67,7 @@ def fake_default_pipeline(): @fixture def fake_sub_pipeline(): - mock_sub_pipeline = pipeline( + mock_sub_pipeline = modular_pipeline( [ node(identity, ["input"], ["intermediate"], name="node0"), ],