Skip to content

Commit

Permalink
fixup! Add DAG Model endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil Breguła committed Jul 6, 2020
1 parent 5bd1569 commit 080a806
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 13 deletions.
14 changes: 7 additions & 7 deletions airflow/api_connexion/endpoints/dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from flask import current_app, request
from flask import current_app
from sqlalchemy import func

from airflow import DAG
from airflow.api_connexion import parameters
from airflow.api_connexion.exceptions import NotFound
from airflow.api_connexion.parameters import check_limit, format_parameters
from airflow.api_connexion.schemas.dag_schema import (
DAGCollection, dag_detail_schema, dag_schema, dags_collection_schema,
)
Expand All @@ -37,7 +37,7 @@ def get_dag(dag_id, session):
if dag is None:
raise NotFound("DAG not found")

return dag_schema.dump(dag).data
return dag_schema.dump(dag)


def get_dag_details(dag_id):
Expand All @@ -50,14 +50,14 @@ def get_dag_details(dag_id):
return dag_detail_schema.dump(dag)


@format_parameters({
'limit': check_limit
})
@provide_session
def get_dags(session):
def get_dags(session, limit, offset=0):
"""
Get all DAGs.
"""
offset = request.args.get(parameters.page_offset, 0)
limit = min(int(request.args.get(parameters.page_limit, 100)), 100)

dags = session.query(DagModel).order_by(DagModel.dag_id).offset(offset).limit(limit).all()

total_entries = session.query(func.count(DagModel.dag_id)).scalar()
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class DAGCollectionSchema(Schema):
total_entries = fields.Int()


dags_collection_schema = DAGCollectionSchema(strict=True)
dag_schema = DAGSchema(strict=True)
dags_collection_schema = DAGCollectionSchema()
dag_schema = DAGSchema()

dag_detail_schema = DAGDetailSchema(strict=True)
dag_detail_schema = DAGDetailSchema()
16 changes: 13 additions & 3 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def test_should_response_200(self):
'dag_id': 'TEST_DAG_1',
'description': None,
'fileloc': '/tmp/test-dag.py',
'is_paused': True,
'is_paused': False,
'is_subdag': False,
'owners': [],
'root_dag_id': None,
Expand Down Expand Up @@ -178,7 +178,7 @@ def test_should_response_200(self):
"dag_id": "TEST_DAG_1",
"description": None,
"fileloc": "/tmp/dag_1.py",
"is_paused": True,
"is_paused": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand All @@ -189,7 +189,7 @@ def test_should_response_200(self):
"dag_id": "TEST_DAG_2",
"description": None,
"fileloc": "/tmp/dag_2.py",
"is_paused": True,
"is_paused": False,
"is_subdag": False,
"owners": [],
"root_dag_id": None,
Expand Down Expand Up @@ -248,6 +248,16 @@ def test_should_response_200_and_handle_pagination(self, url, expected_dag_ids):
self.assertEqual(expected_dag_ids, dag_ids)
self.assertEqual(10, response.json['total_entries'])

def test_should_response_200_default_limit(self):
self._create_dag_models(101)

response = self.client.get("api/v1/dags")

assert response.status_code == 200

self.assertEqual(100, len(response.json['dags']))
self.assertEqual(101, response.json['total_entries'])


class TestPatchDag(TestDagEndpoint):
@pytest.mark.skip(reason="Not implemented yet")
Expand Down

0 comments on commit 080a806

Please sign in to comment.