Skip to content

Commit

Permalink
Replace kedro.pipeline with modular_pipeline.pipeline factory (#99)
Browse files Browse the repository at this point in the history
* Add non-spark related test changes
Replace kedro.pipeline.Pipeline with
kedro.pipeline.modular_pipeline.pipeline factory.
This is for symmetry with changes made to the main kedro library.

Signed-off-by: Adam Farley <adamfrly@gmail.com>
  • Loading branch information
adamfrly authored Feb 1, 2023
1 parent 5b43674 commit a13cb39
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 12 deletions.
1 change: 1 addition & 0 deletions kedro-airflow/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Upcoming release 0.5.2
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.

# Release 0.5.1
* Added additional CLI argument `--jinja-file` to provide a path to a custom Jinja2 template.
Expand Down
5 changes: 3 additions & 2 deletions kedro-airflow/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

import pytest
from kedro.framework.project import pipelines
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro_airflow.plugin import commands


Expand Down Expand Up @@ -30,7 +31,7 @@ def test_create_airflow_dag(
):
"""Check the generation and validity of a simple Airflow DAG."""
dag_file = Path.cwd() / "airflow_dags" / f"{dag_name}.py"
mock_pipeline = Pipeline(
mock_pipeline = modular_pipeline(
[
node(identity, ["input"], ["intermediate"], name="node0"),
node(identity, ["intermediate"], ["output"], name="node1"),
Expand Down
1 change: 1 addition & 0 deletions kedro-datasets/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@

# Upcoming Release:
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.

* Relaxed PyArrow range in line with Pandas

Expand Down
5 changes: 3 additions & 2 deletions kedro-datasets/tests/spark/test_deltatable_dataset.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from delta import DeltaTable
from kedro.io import DataCatalog, DataSetError
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import ParallelRunner
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType, StructField, StructType
Expand Down Expand Up @@ -80,7 +81,7 @@ def no_output(x):

delta_ds = DeltaTableDataSet(filepath="")
catalog = DataCatalog(data_sets={"delta_in": delta_ds})
pipeline = Pipeline([node(no_output, "delta_in", None)])
pipeline = modular_pipeline([node(no_output, "delta_in", None)])
pattern = (
r"The following data sets cannot be used with "
r"multiprocessing: \['delta_in'\]"
Expand Down
11 changes: 6 additions & 5 deletions kedro-datasets/tests/spark/test_spark_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
import pytest
from kedro.io import DataCatalog, DataSetError, Version
from kedro.io.core import generate_timestamp
from kedro.pipeline import Pipeline, node
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import ParallelRunner, SequentialRunner
from moto import mock_s3
from pyspark.sql import SparkSession
Expand Down Expand Up @@ -413,7 +414,7 @@ def test_exists_raises_error(self, mocker):
def test_parallel_runner(self, is_async, spark_in):
"""Test ParallelRunner with SparkDataSet fails."""
catalog = DataCatalog(data_sets={"spark_in": spark_in})
pipeline = Pipeline([node(identity, "spark_in", "spark_out")])
pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")])
pattern = (
r"The following data sets cannot be used with "
r"multiprocessing: \['spark_in'\]"
Expand Down Expand Up @@ -949,7 +950,7 @@ def data_catalog(tmp_path):
class TestDataFlowSequentialRunner:
def test_spark_load_save(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> Spark (save)."""
pipeline = Pipeline([node(identity, "spark_in", "spark_out")])
pipeline = modular_pipeline([node(identity, "spark_in", "spark_out")])
SequentialRunner(is_async=is_async).run(pipeline, data_catalog)

save_path = Path(data_catalog._data_sets["spark_out"]._filepath.as_posix())
Expand All @@ -958,15 +959,15 @@ def test_spark_load_save(self, is_async, data_catalog):

def test_spark_pickle(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> PickleDataSet (save)"""
pipeline = Pipeline([node(identity, "spark_in", "pickle_ds")])
pipeline = modular_pipeline([node(identity, "spark_in", "pickle_ds")])
pattern = ".* was not serialised due to.*"
with pytest.raises(DataSetError, match=pattern):
SequentialRunner(is_async=is_async).run(pipeline, data_catalog)

def test_spark_memory_spark(self, is_async, data_catalog):
"""SparkDataSet(load) -> node -> MemoryDataSet (save and then load) ->
node -> SparkDataSet (save)"""
pipeline = Pipeline(
pipeline = modular_pipeline(
[
node(identity, "spark_in", "memory_ds"),
node(identity, "memory_ds", "spark_out"),
Expand Down
1 change: 1 addition & 0 deletions kedro-telemetry/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Upcoming Release 0.2.4
* Change reference to `kedro.pipeline.Pipeline` object throughout test suite with `kedro.modular_pipeline.pipeline` factory.

# Release 0.2.3

Expand Down
7 changes: 4 additions & 3 deletions kedro-telemetry/tests/test_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from kedro.framework.project import pipelines
from kedro.framework.startup import ProjectMetadata
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline import node, pipeline
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from pytest import fixture

from kedro_telemetry import __version__ as TELEMETRY_VERSION
Expand Down Expand Up @@ -55,7 +56,7 @@ def identity(arg):

@fixture
def fake_default_pipeline():
mock_default_pipeline = pipeline(
mock_default_pipeline = modular_pipeline(
[
node(identity, ["input"], ["intermediate"], name="node0"),
node(identity, ["intermediate"], ["output"], name="node1"),
Expand All @@ -66,7 +67,7 @@ def fake_default_pipeline():

@fixture
def fake_sub_pipeline():
mock_sub_pipeline = pipeline(
mock_sub_pipeline = modular_pipeline(
[
node(identity, ["input"], ["intermediate"], name="node0"),
],
Expand Down

0 comments on commit a13cb39

Please sign in to comment.