Skip to content

Commit

Permalink
Polish around UI and events
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 29, 2015
1 parent 46bbfca commit aed66c7
Show file tree
Hide file tree
Showing 8 changed files with 111 additions and 85 deletions.
23 changes: 14 additions & 9 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,23 @@ def backfill(args):


def trigger_dag(args):
log_to_stdout()
session = settings.Session()
# TODO: verify dag_id
dag_id = args.dag_id
run_id = args.run_id or None
execution_date = datetime.now()
trigger = DagRun(
dag_id=dag_id,
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
external_trigger=True)
session.add(trigger)
dr = session.query(DagRun).filter(
DagRun.dag_id==args.dag_id, DagRun.run_id==args.run_id).first()
if dr:
logging.error("This run_id already exists")
else:
trigger = DagRun(
dag_id=args.dag_id,
run_id=args.run_id,
execution_date=execution_date,
state=State.RUNNING,
external_trigger=True)
session.add(trigger)
logging.info("Created {}".format(trigger))
session.commit()


Expand Down
8 changes: 5 additions & 3 deletions airflow/example_dags/example_python_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@
import time
from pprint import pprint

seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
seven_days_ago = datetime.combine(
datetime.today() - timedelta(7), datetime.min.time())

args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}

dag = DAG(dag_id='example_python_operator', default_args=args)
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)


def my_sleeping_function(random_base):
Expand Down
45 changes: 23 additions & 22 deletions airflow/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,28 +342,29 @@ def schedule_dag(self, dag):
This method checks whether a new DagRun needs to be created
for a DAG based on scheduling interval
"""
DagRun = models.DagRun
session = settings.Session()
qry = session.query(func.max(DagRun.execution_date)).filter(and_(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False
))
last_scheduled_run = qry.scalar()
if not last_scheduled_run:
next_run_date = min([t.start_date for t in dag.tasks])
else:
next_run_date = dag.following_schedule(last_scheduled_run)

if next_run_date <= datetime.now():
next_run = DagRun(
dag_id=dag.dag_id,
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()
if dag.schedule_interval:
DagRun = models.DagRun
session = settings.Session()
qry = session.query(func.max(DagRun.execution_date)).filter(and_(
DagRun.dag_id == dag.dag_id,
DagRun.external_trigger == False
))
last_scheduled_run = qry.scalar()
if not last_scheduled_run:
next_run_date = min([t.start_date for t in dag.tasks])
else:
next_run_date = dag.following_schedule(last_scheduled_run)

if next_run_date <= datetime.now():
next_run = DagRun(
dag_id=dag.dag_id,
run_id='scheduled__' + next_run_date.isoformat(),
execution_date=next_run_date,
state=State.RUNNING,
external_trigger=False
)
session.add(next_run)
session.commit()

def process_dag(self, dag, executor):
"""
Expand Down
18 changes: 14 additions & 4 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from builtins import object, bytes
import copy
from datetime import datetime, timedelta
from dateutil import relativedelta
import functools
import getpass
import imp
Expand All @@ -26,7 +27,7 @@

from sqlalchemy import (
Column, Integer, String, DateTime, Text, Boolean, ForeignKey, PickleType,
Index, BigInteger)
Index)
from sqlalchemy import case, func, or_, and_
from sqlalchemy.ext.declarative import declarative_base, declared_attr
from sqlalchemy.dialects.mysql import LONGTEXT
Expand Down Expand Up @@ -62,7 +63,7 @@
LongText = Text


def clear_task_instances(tis, session):
def clear_task_instances(tis, session, activate_dag_runs=True):
'''
Clears a set of task instances, but makes sure the running ones
get killed.
Expand All @@ -79,6 +80,15 @@ def clear_task_instances(tis, session):
from airflow.jobs import BaseJob as BJ
for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
job.state = State.SHUTDOWN
if activate_dag_runs:
execution_dates = {ti.execution_date for ti in tis}
dag_ids = {ti.dag_id for ti in tis}
drs = session.query(DagRun).filter(
DagRun.dag_id.in_(dag_ids),
DagRun.execution_date.in_(execution_dates),
).all()
for dr in drs:
dr.state = State.RUNNING


class DagBag(object):
Expand Down Expand Up @@ -1970,14 +1980,14 @@ def following_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
return cron.get_next(datetime)
else:
elif isinstance(self.schedule_interval, timedelta):
return dttm + self.schedule_interval

def previous_schedule(self, dttm):
if isinstance(self.schedule_interval, six.string_types):
cron = croniter(self.schedule_interval, dttm)
return cron.get_prev(datetime)
else:
elif isinstance(self.schedule_interval, timedelta):
return dttm - self.schedule_interval

@property
Expand Down
4 changes: 3 additions & 1 deletion airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def date_range(
start_date,
end_date=None,
num=None,
delta=timedelta(1)):
delta=None):
"""
Get a set of dates as a list based on a start, end and delta, delta
can be something that can be added to ``datetime.datetime``
Expand Down Expand Up @@ -315,6 +315,8 @@ def date_range(
datetime.datetime(2016, 2, 1, 0, 0),
datetime.datetime(2016, 3, 1, 0, 0)]
"""
if not delta:
return []
if end_date and start_date > end_date:
raise Exception("Wait. start_date needs to be before end_date")
if end_date and num:
Expand Down
67 changes: 41 additions & 26 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,51 @@ def create_app(config=None):
admin = Admin(
app, name='Airflow',
static_url_path='/admin',
index_view=HomeView(endpoint='', url='/admin'),
index_view=HomeView(name='DAGs', endpoint='', url='/admin'),
template_mode='bootstrap3',
)

from airflow.www import views
admin.add_view(views.Airflow(name='DAGs'))

admin.add_view(views.SlaMissModelView(models.SlaMiss, Session, name="SLA Misses", category="Browse"))
admin.add_view(
views.TaskInstanceModelView(models.TaskInstance, Session, name="Task Instances", category="Browse")
)

admin.add_view(views.LogModelView(models.Log, Session, name="Logs", category="Browse"))
admin.add_view(views.JobModelView(jobs.BaseJob, Session, name="Jobs", category="Browse"))

admin.add_view(views.QueryView(name='Ad Hoc Query', category="Data Profiling"))
admin.add_view(views.ChartModelView(models.Chart, Session, name="Charts", category="Data Profiling"))
admin.add_view(views.KnowEventView(models.KnownEvent, Session, name="Known Events", category="Data Profiling"))

admin.add_view(views.PoolModelView(models.Pool, Session, name="Pools", category="Admin"))
admin.add_view(views.ConfigurationView(name='Configuration', category="Admin"))
admin.add_view(views.UserModelView(models.User, Session, name="Users", category="Admin"))
admin.add_view(views.ConnectionModelView(models.Connection, Session, name="Connections", category="Admin"))
admin.add_view(views.VariableView(models.Variable, Session, name="Variables", category="Admin"))

admin.add_link(base.MenuLink(category='Docs', name='Documentation', url='http://pythonhosted.org/airflow/'))
admin.add_link(base.MenuLink(category='Docs',name='Github',url='https://github.com/airbnb/airflow'))

admin.add_view(views.DagModelView(models.DagModel, Session, name=None))
admin.add_view(views.DagRunModelView(models.DagRun, Session, name="DAG Runs", category="Browse"))
av = admin.add_view
vs = views
av(vs.Airflow(name='DAGs'))

av(vs.QueryView(name='Ad Hoc Query', category="Data Profiling"))
av(vs.ChartModelView(
models.Chart, Session, name="Charts", category="Data Profiling"))
av(vs.KnowEventView(
models.KnownEvent,
Session, name="Known Events", category="Data Profiling"))
av(vs.SlaMissModelView(
models.SlaMiss,
Session, name="SLA Misses", category="Browse"))
av(vs.TaskInstanceModelView(models.TaskInstance,
Session, name="Task Instances", category="Browse"))
av(vs.LogModelView(
models.Log, Session, name="Logs", category="Browse"))
av(vs.JobModelView(
jobs.BaseJob, Session, name="Jobs", category="Browse"))
av(vs.PoolModelView(
models.Pool, Session, name="Pools", category="Admin"))
av(vs.ConfigurationView(
name='Configuration', category="Admin"))
av(vs.UserModelView(
models.User, Session, name="Users", category="Admin"))
av(vs.ConnectionModelView(
models.Connection, Session, name="Connections", category="Admin"))
av(vs.VariableView(
models.Variable, Session, name="Variables", category="Admin"))

admin.add_link(base.MenuLink(
category='Docs', name='Documentation',
url='http://pythonhosted.org/airflow/'))
admin.add_link(
base.MenuLink(category='Docs',
name='Github',url='https://github.com/airbnb/airflow'))

av(vs.DagRunModelView(
models.DagRun, Session, name="DAG Runs", category="Browse"))
av(vs.DagModelView(models.DagModel, Session, name=None))
# Hack to not add this view to the menu
admin._menu = admin._menu[:-1]

Expand Down
11 changes: 0 additions & 11 deletions airflow/www/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ class DateTimeForm(Form):
"Execution date", widget=DateTimePickerWidget())


class GraphForm(Form):
execution_date = DateTimeField(
"Execution date", widget=DateTimePickerWidget())
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
('TB', "Top->Bottom"),
('BT', "Bottom->Top"),
))


class TreeForm(Form):
base_date = DateTimeField(
"Anchor date", widget=DateTimePickerWidget(), default=datetime.now())
Expand Down
20 changes: 11 additions & 9 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from wtforms import (
widgets,
Form, DateTimeField, SelectField, TextAreaField, PasswordField, StringField)
Form, SelectField, TextAreaField, PasswordField, StringField)

from pygments import highlight, lexers
from pygments.formatters import HtmlFormatter
Expand All @@ -41,14 +41,14 @@
from airflow import models
from airflow.settings import Session
from airflow import login
from airflow.configuration import conf, AirflowConfigException
from airflow.configuration import conf
from airflow import utils
from airflow.utils import AirflowException
from airflow.www import utils as wwwutils
from airflow import settings
from airflow.models import State

from airflow.www.forms import DateTimeForm, TreeForm, GraphForm
from airflow.www.forms import DateTimeForm, TreeForm

QUERY_LIMIT = 100000
CHART_LIMIT = 200000
Expand Down Expand Up @@ -1032,23 +1032,24 @@ def tree(self):

dates = utils.date_range(
base_date, num=-abs(num_runs), delta=dag.schedule_interval)
min_date = dates[0] if dates else datetime(2000, 1, 1)

DR = models.DagRun
dag_runs = (
session.query(DR)
.filter(
DR.dag_id==dag.dag_id,
DR.execution_date>=dates[0],
DR.execution_date<=dates[-1])
DR.execution_date<=base_date,
DR.execution_date>=min_date)
.all()
)
dag_runs = {
dr.execution_date: utils.alchemy_to_dict(dr) for dr in dag_runs}

tis = dag.get_task_instances(
session, start_date=dates[0], end_date=dates[-1])
session, start_date=min_date, end_date=base_date)
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis])
max_date = max([ti.execution_date for ti in tis]) if dates else None
task_instances = {}
for ti in tis:
tid = utils.alchemy_to_dict(ti)
Expand Down Expand Up @@ -1180,7 +1181,7 @@ def get_upstream(task):
dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
if dttm == dr.execution_date:
dr_state = dr.state
flash(str(dttm))

class GraphForm(Form):
execution_date = SelectField("DAG run", choices=dr_choices)
arrange = SelectField("Layout", choices=(
Expand All @@ -1189,7 +1190,8 @@ class GraphForm(Form):
('TB', "Top->Bottom"),
('BT', "Bottom->Top"),
))
form = GraphForm(data={'execution_date': dttm.isoformat(), 'arrange': arrange})
form = GraphForm(
data={'execution_date': dttm.isoformat(), 'arrange': arrange})

task_instances = {
ti.task_id: utils.alchemy_to_dict(ti)
Expand Down

0 comments on commit aed66c7

Please sign in to comment.