Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use pydoclint as a documentation linter #212

Merged
merged 15 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,15 @@ repos:
rev: 6.3.0
hooks:
- id: pydocstyle
args: [--convention=google]
args: [--convention=google, --add-ignore=D107]

- repo: https://github.com/lovesegfault/beautysh
rev: v6.2.1
hooks:
- id: beautysh

- repo: https://github.com/jsh9/pydoclint
rev: 0.3.8
hooks:
- id: pydoclint
args: [--style=google, --check-return-types=True, --config=pyproject.toml src]
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ setup-dev: ## Setup development environment
@. utils/install_dependencies.sh

check: ## Lint and format code
@echo "Linting..."
@echo "Linting API..."
@poetry run ruff src/otg .
@echo "Linting docstrings..."
@poetry run pydoclint --config=pyproject.toml src
@poetry run pydoclint --config=pyproject.toml --skip-checking-short-docstrings=true tests
@echo "Formatting..."
@poetry run black src/otg .
@poetry run isort src/otg .
Expand Down
7,337 changes: 3,807 additions & 3,530 deletions poetry.lock

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ google-cloud-dataproc = "^5.4.1"
apache-airflow = "^2.7.0"
apache-airflow-providers-google = "^10.10.1"


[tool.poetry.group.dev-dependencies.dependencies]
pydoclint = "^0.3.8"

[tool.semantic_release]
branch = "main"
version_variable = "src/otg/__init__.py:__version__"
Expand Down Expand Up @@ -112,3 +116,12 @@ docstring-quotes = "double"

[tool.ruff.pydocstyle]
convention = "google"

[tool.pydoclint]
style = 'google'
exclude = '\.git|\.venv|__init__.py'
require-return-section-when-returning-nothing = false
check-return-types = true
allow-init-docstring = true
arg-type-hints-in-docstring = true
skip-checking-short-docstrings = false
82 changes: 66 additions & 16 deletions src/airflow/dags/common_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import os
from typing import Any

import pendulum
from airflow.providers.google.cloud.operators.dataproc import (
Expand Down Expand Up @@ -55,12 +56,22 @@


def create_cluster(
cluster_name,
master_machine_type="n1-standard-4",
worker_machine_type="n1-standard-16",
num_workers=0,
):
"""Generate an Airflow task to create a Dataproc cluster. Common parameters are reused, and varying parameters can be specified as needed."""
cluster_name: str,
master_machine_type: str = "n1-standard-4",
worker_machine_type: str = "n1-standard-16",
num_workers: int = 0,
) -> DataprocCreateClusterOperator:
"""Generate an Airflow task to create a Dataproc cluster. Common parameters are reused, and varying parameters can be specified as needed.

Args:
cluster_name (str): Name of the cluster.
master_machine_type (str): Machine type for the master node. Defaults to "n1-standard-4".
worker_machine_type (str): Machine type for the worker nodes. Defaults to "n1-standard-16".
num_workers (int): Number of worker nodes. Defaults to 0.

Returns:
DataprocCreateClusterOperator: Airflow task to create a Dataproc cluster.
"""
cluster_generator_config = ClusterGenerator(
project_id=project_id,
zone=zone,
Expand Down Expand Up @@ -89,8 +100,20 @@ def create_cluster(
)


def submit_job(cluster_name, task_id, job_type, job_specification):
"""Submit an arbitrary job to a Dataproc cluster."""
def submit_job(
cluster_name: str, task_id: str, job_type: str, job_specification: dict[str, Any]
) -> DataprocSubmitJobOperator:
"""Submit an arbitrary job to a Dataproc cluster.

Args:
cluster_name (str): Name of the cluster.
task_id (str): Name of the task.
job_type (str): Type of the job to submit.
job_specification (dict[str, Any]): Specification of the job to submit.

Returns:
DataprocSubmitJobOperator: Airflow task to submit an arbitrary job to a Dataproc cluster.
"""
return DataprocSubmitJobOperator(
task_id=task_id,
region=region,
Expand All @@ -104,17 +127,30 @@ def submit_job(cluster_name, task_id, job_type, job_specification):
)


def submit_pyspark_job(cluster_name, task_id, python_module_path, args):
"""Submit a PySpark job to a Dataproc cluster."""
def submit_pyspark_job(
cluster_name: str, task_id: str, python_module_path: str, args: dict[str, Any]
) -> DataprocSubmitJobOperator:
"""Submit a PySpark job to a Dataproc cluster.

Args:
cluster_name (str): Name of the cluster.
task_id (str): Name of the task.
python_module_path (str): Path to the Python module to run.
args (dict[str, Any]): Arguments to pass to the Python module.

Returns:
DataprocSubmitJobOperator: Airflow task to submit a PySpark job to a Dataproc cluster.
"""
formatted_args = []
if isinstance(args, dict):
args = [f"--{arg}={val}" for arg, val in args.items()]
formatted_args = [f"--{arg}={val}" for arg, val in args.items()]
return submit_job(
cluster_name=cluster_name,
task_id=task_id,
job_type="pyspark_job",
job_specification={
"main_python_file_uri": f"{initialisation_base_path}/{python_module_path}",
"args": args,
"args": formatted_args,
"properties": {
"spark.jars": "/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/hail-all-spark.jar",
"spark.driver.extraClassPath": "/opt/conda/miniconda3/lib/python3.10/site-packages/hail/backend/hail-all-spark.jar",
Expand All @@ -126,8 +162,15 @@ def submit_pyspark_job(cluster_name, task_id, python_module_path, args):
)


def install_dependencies(cluster_name):
"""Install dependencies on a Dataproc cluster."""
def install_dependencies(cluster_name: str) -> DataprocSubmitJobOperator:
"""Install dependencies on a Dataproc cluster.

Args:
cluster_name (str): Name of the cluster.

Returns:
DataprocSubmitJobOperator: Airflow task to install dependencies on a Dataproc cluster.
"""
return submit_job(
cluster_name=cluster_name,
task_id="install_dependencies",
Expand All @@ -146,8 +189,15 @@ def install_dependencies(cluster_name):
)


def delete_cluster(cluster_name):
"""Generate an Airflow task to delete a Dataproc cluster."""
def delete_cluster(cluster_name: str) -> DataprocDeleteClusterOperator:
"""Generate an Airflow task to delete a Dataproc cluster.

Args:
cluster_name (str): Name of the cluster.

Returns:
DataprocDeleteClusterOperator: Airflow task to delete a Dataproc cluster.
"""
return DataprocDeleteClusterOperator(
task_id="delete_cluster",
project_id=project_id,
Expand Down
7 changes: 5 additions & 2 deletions src/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@


@pytest.fixture(scope="session", autouse=True)
def spark(doctest_namespace: dict[str, Any], tmp_path_factory: Any) -> SparkSession:
def spark(
doctest_namespace: dict[str, Any], tmp_path_factory: pytest.TempPathFactory
) -> SparkSession:
"""Local spark session for testing purposes.

It returns a session and make it available to doctests through
the `spark` namespace.

Args:
doctest_namespace (Dict[str, Any]): pytest namespace for doctests
doctest_namespace (dict[str, Any]): pytest namespace for doctests
tmp_path_factory (pytest.TempPathFactory): pytest tmp_path_factory

Returns:
SparkSession: local spark session
Expand Down
12 changes: 5 additions & 7 deletions src/otg/common/Liftover.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,15 @@ class LiftOverSpark:
- If regions are provided, the mapping is dropped if the new region is reversed (mapped_start > mapped_end).
- If regions are provided, the mapping is dropped if the difference of the lenght of the mapped region and original is larger than a threshold.
- When lifting over intervals, only unique coordinates are lifted, they joined back to the original dataframe.

Args:
chain_file (str): Path to the chain file
max_difference (int): Maximum difference between the length of the mapped region and the original region. Defaults to 100.
"""

def __init__(
def __init__( # noqa: D107
self: LiftOverSpark, chain_file: str, max_difference: int = 100
) -> None:
"""Intialise LiftOverSpark object.

Args:
chain_file (str): Path to the chain file
max_difference (int): Maximum difference between the length of the mapped region and the original region. Defaults to 100.
"""
self.chain_file = chain_file
self.max_difference = max_difference

Expand Down
4 changes: 2 additions & 2 deletions src/otg/common/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ def flatten_schema(schema: StructType, prefix: str = "") -> list:
"""It takes a Spark schema and returns a list of all fields in the schema once flattened.

Args:
schema: The schema of the dataframe
prefix: The prefix to prepend to the field names.
schema (StructType): The schema of the dataframe
prefix (str): The prefix to prepend to the field names. Defaults to "".

Returns:
list: A list of all the columns in the dataframe.
Expand Down
Loading