Skip to content

Commit

Permalink
[EWT-569] Airflow Upgrade to 1.10.14, Cherry-Pick 299b4d8 from 1.10.4
Browse files Browse the repository at this point in the history
CP contains [TWTR] CP from 1.10+twtr (twitter-forks#35)

* 99ee040: CP from 1.10+twtr

* 2e01c24: CP from 1.10.4 ([TWTR][AIRFLOW-4939] Fixup use of fallback kwarg in conf.getint)

* 00cb4ae: [TWTR][AIRFLOW-XXXX] Cherry-pick d4a83bc and bump version (twitter-forks#21)

* CP 51b1aee: Relax version requiremets (twitter-forks#24)

* CP 67a4d1c: [CX-16266] Change with reference to 1a4c164 commit in open source (twitter-forks#25)

* CP 54bd095: [TWTR][CX-17516] Queue tasks already being handled by the executor (twitter-forks#26)

* CP 87fcc1c: [TWTR][CX-17516] Requeue tasks in the queued state (twitter-forks#27)

* CP 98a1ca9: [AIRFLOW-6625] Explicitly log using utf-8 encoding (apache#7247) (twitter-forks#31)

* fixing models.py and jobs.py file fix after CP

* fixing typo and version bump

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
  • Loading branch information
2 people authored and Ayush Sethi committed Dec 16, 2020
1 parent 78dafd6 commit c404b13
Show file tree
Hide file tree
Showing 11 changed files with 179 additions and 8 deletions.
4 changes: 2 additions & 2 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ def queue_command(self, simple_task_instance, command, priority=1, queue=None):
key = simple_task_instance.key
if key not in self.queued_tasks and key not in self.running:
self.log.info("Adding to queue: %s", command)
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)
else:
self.log.info("could not queue task %s", key)
self.log.info("Adding to queue even though already queued or running {}".format(command, key))
self.queued_tasks[key] = (command, priority, queue, simple_task_instance)

def queue_task_instance(
self,
Expand Down
52 changes: 51 additions & 1 deletion airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ def _run_file_processor(result_channel,
stdout = StreamLogWriter(log, logging.INFO)
stderr = StreamLogWriter(log, logging.WARN)

log.info("Setting log context for file {}".format(file_path))
# log file created here
set_context(log, file_path)
log.info("Successfully set log context for file {}".format(file_path))
setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path))

try:
Expand All @@ -154,6 +157,7 @@ def _run_file_processor(result_channel,
log.info("Started process (PID=%s) to work on %s",
os.getpid(), file_path)
scheduler_job = SchedulerJob(dag_ids=dag_ids, log=log)
log.info("Processing file {}".format(file_path))
result = scheduler_job.process_file(file_path,
zombies,
pickle_dags)
Expand Down Expand Up @@ -1030,7 +1034,7 @@ def _find_executable_task_instances(self, simple_dag_bag, states, session=None):

if self.executor.has_task(task_instance):
self.log.debug(
"Not handling task %s as the executor reports it is running",
"Still handling task %s even though as the executor reports it is running",
task_instance.key
)
num_tasks_in_executor += 1
Expand Down Expand Up @@ -1454,6 +1458,50 @@ def _execute_helper(self):

# Send tasks for execution if available
simple_dag_bag = SimpleDagBag(simple_dags)
if len(simple_dags) > 0:
try:
simple_dag_bag = SimpleDagBag(simple_dags)

# Handle cases where a DAG run state is set (perhaps manually) to
# a non-running state. Handle task instances that belong to
# DAG runs in those states

# If a task instance is up for retry but the corresponding DAG run
# isn't running, mark the task instance as FAILED so we don't try
# to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.UP_FOR_RETRY],
State.FAILED)
# If a task instance is scheduled or queued or up for reschedule,
# but the corresponding DAG run isn't running, set the state to
# NONE so we don't try to re-run it.
self._change_state_for_tis_without_dagrun(simple_dag_bag,
[State.QUEUED,
State.SCHEDULED,
State.UP_FOR_RESCHEDULE],
State.NONE)

scheduled_dag_ids = ", ".join(simple_dag_bag.dag_ids)
self.log.info('DAGs to be executed: {}'.format(scheduled_dag_ids))

# TODO(CX-17516): State.QUEUED has been added here which is a hack as the Celery
# Executor does not reliably enqueue tasks with the my MySQL broker, and we have
# seen tasks hang after they get queued. The effect of this hack is queued tasks
# will constantly be requeued and resent to the executor (Celery).
# This should be removed when we switch away from the MySQL Celery backend.
self._execute_task_instances(simple_dag_bag,
(State.SCHEDULED, State.QUEUED))

except Exception as e:
self.log.error("Error queuing tasks")
self.log.exception(e)
continue

# Call heartbeats
self.log.debug("Heartbeating the executor")
self.executor.heartbeat()

self._change_state_for_tasks_failed_to_execute()

if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
continue
Expand Down Expand Up @@ -1490,7 +1538,9 @@ def _execute_helper(self):
sleep(sleep_length)

# Stop any processors
self.log.info("Terminating DAG processors")
self.processor_agent.terminate()
self.log.info("All DAG processors terminated")

# Verify that all files were processed, and if so, deactivate DAGs that
# haven't been touched by the scheduler as they likely have been
Expand Down
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,8 @@ def __init__(
self
)
self._schedule_interval = schedule_interval
self.retries = retries
self.retries = retries if retries is not None else \
int(configuration.conf.get('core', 'default_task_retries', fallback=0))
self.queue = queue
self.pool = pool
self.pool_slots = pool_slots
Expand Down
6 changes: 6 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,8 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):
"""
from airflow.models.serialized_dag import SerializedDagModel

self.log.info("Attempting to sync DAG {} to DB".format(self._dag_id))

if owner is None:
owner = self.owner
if sync_time is None:
Expand Down Expand Up @@ -1539,8 +1541,12 @@ def sync_to_db(self, owner=None, sync_time=None, session=None):

session.commit()

self.log.info("Synced DAG %s to DB", self._dag_id)

for subdag in self.subdags:
self.log.info("Syncing SubDAG %s", subdag._dag_id)
subdag.sync_to_db(owner=owner, sync_time=sync_time, session=session)
self.log.info("Successfully synced SubDAG %s", subdag._dag_id)

# Write DAGs to serialized_dag table in DB.
# subdags are not written into serialized_dag, because they are not displayed
Expand Down
1 change: 1 addition & 0 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1004,6 +1004,7 @@ def signal_handler(signum, frame):
self.state = State.SUCCESS
except AirflowSkipException as e:
# Recording SKIP
# This change is in reference to [AIRFLOW-5653][CX-16266]
# log only if exception has any arguments to prevent log flooding
if e.args:
self.log.info(e)
Expand Down
7 changes: 5 additions & 2 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG # NOQA F401
from airflow.logging_config import configure_logging
from airflow.utils.module_loading import import_string
from airflow.utils.sqlalchemy import setup_event_handlers

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -274,7 +275,8 @@ def prepare_engine_args(disable_connection_pool=False):
# The maximum overflow size of the pool.
# When the number of checked-out connections reaches the size set in pool_size,
# additional connections will be returned up to this limit.
# When those additional connections are returned to the pool, they are disconnected and discarded.
# When those additional connections are returned to the pool, they are
# disconnected and discarded.
# It follows then that the total number of simultaneous connections
# the pool will allow is pool_size + max_overflow,
# and the total number of “sleeping” connections the pool will allow is pool_size.
Expand All @@ -296,7 +298,8 @@ def prepare_engine_args(disable_connection_pool=False):
# https://docs.sqlalchemy.org/en/13/core/pooling.html#disconnect-handling-pessimistic
pool_pre_ping = conf.getboolean('core', 'SQL_ALCHEMY_POOL_PRE_PING', fallback=True)

log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, max_overflow=%d, "
log.debug("settings.prepare_engine_args(): Using pool settings. pool_size=%d, "
"max_overflow=%d, "
"pool_recycle=%d, pid=%d", pool_size, max_overflow, pool_recycle, os.getpid())
engine_args['pool_size'] = pool_size
engine_args['pool_recycle'] = pool_recycle
Expand Down
3 changes: 3 additions & 0 deletions airflow/utils/dag_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,8 @@ def start_new_processes(self):
)
self._processors[file_path] = processor

self.log.info("Number of active file processors: {}".format(len(self._processors)))

def prepare_file_path_queue(self):
"""
Generate more file paths to process. Result are saved in _file_path_queue.
Expand Down Expand Up @@ -1336,6 +1338,7 @@ def _find_zombies(self, session):

self._zombies = zombies


def _kill_timed_out_processors(self):
"""
Kill any file processors that timeout to defend against process hangs.
Expand Down
2 changes: 2 additions & 0 deletions airflow/utils/log/file_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ def _init_file(self, filename):

if not os.path.exists(directory):
try:
logging.info("Creating directory {}".format(directory))
os.makedirs(directory)
except OSError:
if not os.path.isdir(directory):
raise

if not os.path.exists(full_path):
logging.info("Creating file {}".format(full_path))
open(full_path, "a").close()

return full_path
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def set_context(self, ti):
:param ti: task instance object
"""
local_loc = self._init_file(ti)
self.handler = logging.FileHandler(local_loc)
self.handler = logging.FileHandler(local_loc, encoding='utf-8')
if self.formatter:
self.handler.setFormatter(self.formatter)
self.handler.setLevel(self.level)
Expand Down
1 change: 0 additions & 1 deletion airflow/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,3 @@

version = '1.10.14+test1'


106 changes: 106 additions & 0 deletions tests/test_sqlalchemy_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest

from sqlalchemy.pool import NullPool

from airflow import settings
from tests.compat import patch
from tests.test_utils.config import conf_vars

SQL_ALCHEMY_CONNECT_ARGS = {
'test': 43503,
'dict': {
'is': 1,
'supported': 'too'
}
}


class TestSqlAlchemySettings(unittest.TestCase):
def setUp(self):
self.old_engine = settings.engine
self.old_session = settings.Session
self.old_conn = settings.SQL_ALCHEMY_CONN
settings.SQL_ALCHEMY_CONN = "mysql+foobar://user:pass@host/dbname?inline=param&another=param"

def tearDown(self):
settings.engine = self.old_engine
settings.Session = self.old_session
settings.SQL_ALCHEMY_CONN = self.old_conn

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_configure_orm_with_default_values(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
settings.configure_orm()
mock_create_engine.assert_called_once_with(
settings.SQL_ALCHEMY_CONN,
connect_args={},
encoding='utf-8',
max_overflow=10,
pool_pre_ping=True,
pool_recycle=1800,
pool_size=5
)

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_sql_alchemy_connect_args(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
config = {
('core', 'sql_alchemy_connect_args'): 'tests.test_sqlalchemy_config.SQL_ALCHEMY_CONNECT_ARGS',
('core', 'sql_alchemy_pool_enabled'): 'False'
}
with conf_vars(config):
settings.configure_orm()
mock_create_engine.assert_called_once_with(
settings.SQL_ALCHEMY_CONN,
connect_args=SQL_ALCHEMY_CONNECT_ARGS,
poolclass=NullPool,
encoding='utf-8'
)

@patch('airflow.settings.setup_event_handlers')
@patch('airflow.settings.scoped_session')
@patch('airflow.settings.sessionmaker')
@patch('airflow.settings.create_engine')
def test_sql_alchemy_invalid_connect_args(self,
mock_create_engine,
mock_sessionmaker,
mock_scoped_session,
mock_setup_event_handlers):
config = {
('core', 'sql_alchemy_connect_args'): 'does.not.exist',
('core', 'sql_alchemy_pool_enabled'): 'False'
}
with self.assertRaises(ImportError):
with conf_vars(config):
settings.configure_orm()

0 comments on commit c404b13

Please sign in to comment.