Skip to content

Commit

Permalink
Detect pipeline attribute during compile/run (#398)
Browse files Browse the repository at this point in the history
Addresses #375
  • Loading branch information
PhilippeMoussalli authored Sep 13, 2023
1 parent b72e252 commit c23189b
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 85 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ For more advanced use cases, you can use the `DaskTransformComponent` instead.
Once you have a pipeline you can easily run (and compile) it by using the built-in CLI:

```bash
fondant run foo.bar:pipeline --local
fondant run pipeline.py --local
```

To see all available arguments you can check the fondant CLI help pages
Expand Down
6 changes: 3 additions & 3 deletions docs/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ Fondant has a feature rich CLI that helps you with these steps. Let's start by r
First of all make sure you have [Docker Compose](https://docs.docker.com/compose/) installed on your system

```bash
fondant run pipeline:my_pipeline --local
fondant run pipeline.py --local
```

We call the fondant CLI to compile and run our pipeline, we pass a reference to our pipeline using the import_string syntax `<module>:<instance>`. We also pass the `--local` flag to indicate we want to compile our pipeline for the local runner.
We call the fondant CLI to compile and run our pipeline, we pass the module containing the pipeline instance, the instance is the automatically detected. We also pass the `--local` flag to indicate we want to compile our pipeline for the local runner.
Running this command will create a `docker-compose.yml` file with the compiled pipeline definition. Feel free to inspect this file but changing it is not needed.

Note that if you use a local `base_path` in your pipeline declaration that this path will be mounted in the docker containers. This means that the data will be stored locally on your machine. If you use a cloud storage path, the data will be stored in the cloud.
Expand Down Expand Up @@ -287,7 +287,7 @@ We add the component to our pipeline definition and specify that it depends on t
We can now easily run our new pipeline:

```bash
fondant run pipeline:my_pipeline --local
fondant run pipeline --local
```

You will see that the components runs sequentially and that each has its own logs.
Expand Down
115 changes: 59 additions & 56 deletions src/fondant/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import textwrap
import typing as t
from collections import defaultdict
from types import ModuleType

from fondant.compiler import DockerCompiler, KubeFlowCompiler
from fondant.component import BaseComponent, Component
Expand Down Expand Up @@ -52,7 +53,7 @@ def entrypoint():
This CLI is used to interact with fondant pipelines like compiling and running your pipelines.
Example:
fondant compile my_project.my_pipeline.py:pipeline
fondant compile my_project.my_pipeline.py
""",
),
epilog=textwrap.dedent(
Expand Down Expand Up @@ -208,16 +209,18 @@ def register_compile(parent_parser):
- to mount cloud credentials (see examples))
Example:
fondant compile my_project.my_pipeline.py:pipeline --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials
fondant compile my_project.my_pipeline.py --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials
fondant compile my_project.my_pipeline.py:pipeline --kubeflow --extra-volumes $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json
fondant compile my_project.my_pipeline.py --kubeflow --extra-volumes $HOME/.config/gcloud/application_default_credentials.json:/root/.config/gcloud/application_default_credentials.json
""",
),
)
parser.add_argument(
"pipeline",
help="Path to the fondant pipeline: path.to.module:instance",
type=pipeline_from_string,
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
# add a mutually exclusive group for the mode
mode_group = parser.add_mutually_exclusive_group(required=True)
Expand Down Expand Up @@ -247,17 +250,19 @@ def register_compile(parent_parser):

def compile(args):
args = set_default_output(args)
pipeline = pipeline_from_module(args.ref)

if args.local:
compiler = DockerCompiler()
compiler.compile(
pipeline=args.pipeline,
pipeline=pipeline,
extra_volumes=args.extra_volumes,
output_path=args.output_path,
build_args=args.build_arg,
)
elif args.kubeflow:
compiler = KubeFlowCompiler()
compiler.compile(pipeline=args.pipeline, output_path=args.output_path)
compiler.compile(pipeline=pipeline, output_path=args.output_path)


def register_run(parent_parser):
Expand All @@ -274,15 +279,16 @@ def register_run(parent_parser):
You can use the --extra-volumes flag to specify extra volumes to mount in the containers this can be used:
Example:
fondant run my_project.my_pipeline.py:pipeline --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials
fondant run my_project.my_pipeline.py --local --extra-volumes $HOME/.aws/credentials:/root/.aws/credentials
fondant run ./my_compiled_kubeflow_pipeline.tgz --kubeflow
""",
),
)
parser.add_argument(
"ref",
help="""Reference to the pipeline to run, can be a path to a spec file or
a pipeline instance that will be compiled first""",
a module containing the pipeline instance that will be compiled first (e.g. pipeline.py)
""",
action="store",
)
# add a mutually exclusive group for the mode
Expand Down Expand Up @@ -315,8 +321,8 @@ def run(args):

if args.local:
try:
pipeline = pipeline_from_string(args.ref)
except ImportFromStringError:
pipeline = pipeline_from_module(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
Expand All @@ -338,8 +344,8 @@ def run(args):
msg = "--host argument is required for running on Kubeflow"
raise ValueError(msg)
try:
pipeline = pipeline_from_string(args.ref)
except ImportFromStringError:
pipeline = pipeline_from_module(args.ref)
except ModuleNotFoundError:
spec_ref = args.ref
else:
spec_ref = args.output_path
Expand Down Expand Up @@ -389,62 +395,59 @@ def execute(args):
executor.execute(component)


class ImportFromStringError(Exception):
class ComponentImportError(Exception):
"""Error raised when an import string is not valid."""


class ImportFromModuleError(Exception):
class PipelineImportError(Exception):
"""Error raised when an import from module is not valid."""


def pipeline_from_string(import_string: str) -> Pipeline:
"""Try to import a pipeline from a string otherwise raise an ImportFromStringError."""
module_str, _, attr_str = import_string.rpartition(":")
if not attr_str or not module_str:
raise ImportFromStringError(
f"{import_string} is not a valid import string."
+ "Please provide a valid import string in the format of module:attr",
)
def get_module(module_str: str) -> ModuleType:
"""Function that retrieves module from a module string."""
if ".py" in module_str:
module_str = module_str.rsplit(".py", 1)[0]

module_str = module_str.replace("/", ".")

try:
module = importlib.import_module(module_str)
except ImportError:
msg = f"{module_str} is not a valid module. Please provide a valid module."
raise ImportFromStringError(
msg,
)
except ModuleNotFoundError:
msg = f"`{module_str}` was not found. Please provide a valid module."
raise ModuleNotFoundError(msg)

try:
for attr_str_element in attr_str.split("."):
instance = getattr(module, attr_str_element)
except AttributeError:
msg = f"{attr_str} is not found in {module}."
raise ImportFromStringError(msg)
return module

if not isinstance(instance, Pipeline):
msg = f"{module}:{instance} is not a valid pipeline."
raise ImportFromStringError(msg)

return instance
def pipeline_from_module(module_str: str) -> Pipeline:
"""Try to import a pipeline from a string otherwise raise an ImportFromStringError."""
module = get_module(module_str)

pipeline_instances = [
obj for obj in module.__dict__.values() if isinstance(obj, Pipeline)
]

def component_from_module(module_str: str) -> t.Type[Component]:
"""Try to import a component from a module otherwise raise an ImportFromModuleError."""
if ".py" in module_str:
module_str = module_str.rsplit(".py", 1)[0]

module_str = module_str.replace("/", ".")
if not pipeline_instances:
msg = f"No pipeline found in module {module_str}"
raise PipelineImportError(msg)

try:
class_members = inspect.getmembers(
importlib.import_module(module_str),
inspect.isclass,
)
except ModuleNotFoundError:
msg = f"`{module_str}` was not found. Please provide a valid module."
raise ImportFromModuleError(
msg,
if len(pipeline_instances) > 1:
msg = (
f"Found multiple instantiated pipelines in {module_str}. Only one pipeline "
f"can be present"
)
raise PipelineImportError(msg)

pipeline = pipeline_instances[0]
logger.info(f"Pipeline `{pipeline.name}` found in module {module_str}")

return pipeline


def component_from_module(module_str: str) -> t.Type[Component]:
"""Try to import a component from a module otherwise raise an ImportFromModuleError."""
module = get_module(module_str)
class_members = inspect.getmembers(module, inspect.isclass)

component_classes_dict = defaultdict(list)

Expand All @@ -455,7 +458,7 @@ def component_from_module(module_str: str) -> t.Type[Component]:

if len(component_classes_dict) == 0:
msg = f"No Component found in module {module_str}"
raise ImportFromModuleError(msg)
raise ComponentImportError(msg)

max_order = max(component_classes_dict)
found_components = component_classes_dict[max_order]
Expand All @@ -465,7 +468,7 @@ def component_from_module(module_str: str) -> t.Type[Component]:
f"Found multiple components in {module_str}: {found_components}. Only one component "
f"can be present"
)
raise ImportFromModuleError(msg)
raise ComponentImportError(msg)

component_name, component_cls = found_components[0]
logger.info(f"Component `{component_name}` found in module {module_str}")
Expand Down
File renamed without changes.
4 changes: 4 additions & 0 deletions tests/example_modules/invalid_double_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from fondant.pipeline import Pipeline

TEST_PIPELINE = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
TEST_PIPELINE_2 = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
3 changes: 3 additions & 0 deletions tests/example_modules/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from fondant.pipeline import Pipeline

pipeline = Pipeline(pipeline_name="test_pipeline", base_path="some/path")
Loading

0 comments on commit c23189b

Please sign in to comment.