-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Redis task handler #31855
Merged
Merged
Add Redis task handler #31855
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
import logging | ||
from functools import cached_property | ||
from typing import Any | ||
|
||
from redis import Redis | ||
|
||
from airflow.configuration import conf | ||
from airflow.models import TaskInstance | ||
from airflow.providers.redis.hooks.redis import RedisHook | ||
from airflow.utils.log.file_task_handler import FileTaskHandler | ||
from airflow.utils.log.logging_mixin import LoggingMixin | ||
|
||
|
||
class RedisTaskHandler(FileTaskHandler, LoggingMixin): | ||
""" | ||
RedisTaskHandler is a Python log handler that handles and reads task instance logs. | ||
It extends airflow FileTaskHandler and uploads to and reads from Redis. | ||
|
||
:param base_log_folder: | ||
base folder to store logs locally | ||
:param max_lines: | ||
Maximum number of lines of log to store | ||
If omitted, this is 10000. | ||
:param ttl_seconds: | ||
Maximum number of seconds to store logs | ||
If omitted, this is the equivalent of 28 days. | ||
:param conn_id: | ||
Airflow connection ID for the Redis hook to use | ||
If omitted or None, the ID specified in the option logging.remote_log_conn_id is used. | ||
""" | ||
|
||
trigger_should_wrap = True | ||
|
||
def __init__( | ||
self, | ||
base_log_folder: str, | ||
max_lines: int = 10000, | ||
ttl_seconds: int = 60 * 60 * 24 * 28, | ||
conn_id: str | None = None, | ||
): | ||
super().__init__(base_log_folder) | ||
self.handler: _RedisHandler | None = None | ||
self.max_lines = max_lines | ||
self.ttl_seconds = ttl_seconds | ||
self.conn_id = conn_id if conn_id is not None else conf.get("logging", "REMOTE_LOG_CONN_ID") | ||
|
||
@cached_property | ||
def conn(self): | ||
return RedisHook(redis_conn_id=self.conn_id).get_conn() | ||
|
||
def _read( | ||
self, | ||
ti: TaskInstance, | ||
try_number: int, | ||
metadata: dict[str, Any] | None = None, | ||
): | ||
log_str = b"\n".join( | ||
self.conn.lrange(self._render_filename(ti, try_number), start=0, end=-1) | ||
).decode() | ||
return log_str, {"end_of_log": True} | ||
|
||
def set_context(self, ti: TaskInstance): | ||
super().set_context(ti) | ||
self.handler = _RedisHandler( | ||
self.conn, | ||
key=self._render_filename(ti, ti.try_number), | ||
max_lines=self.max_lines, | ||
ttl_seconds=self.ttl_seconds, | ||
) | ||
self.handler.setFormatter(self.formatter) | ||
|
||
|
||
class _RedisHandler(logging.Handler): | ||
def __init__(self, conn: Redis, key: str, max_lines: int, ttl_seconds: int): | ||
super().__init__() | ||
self.conn = conn | ||
self.key = key | ||
self.max_lines = max_lines | ||
self.ttl_seconds = ttl_seconds | ||
|
||
def emit(self, record): | ||
p = self.conn.pipeline() | ||
p.rpush(self.key, self.format(record)) | ||
p.ltrim(self.key, start=-self.max_lines, end=-1) | ||
p.expire(self.key, time=self.ttl_seconds) | ||
p.execute() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
.. Licensed to the Apache Software Foundation (ASF) under one | ||
or more contributor license agreements. See the NOTICE file | ||
distributed with this work for additional information | ||
regarding copyright ownership. The ASF licenses this file | ||
to you under the Apache License, Version 2.0 (the | ||
"License"); you may not use this file except in compliance | ||
with the License. You may obtain a copy of the License at | ||
|
||
.. http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
.. Unless required by applicable law or agreed to in writing, | ||
software distributed under the License is distributed on an | ||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
KIND, either express or implied. See the License for the | ||
specific language governing permissions and limitations | ||
under the License. | ||
|
||
.. _write-logs-redis: | ||
|
||
Writing logs to Redis | ||
--------------------- | ||
|
||
Airflow can be configured to store log lines in Redis up to a configured maximum log lines, always keeping the most recent, up to a configured TTL. This deviates from other existing task handlers in that it accepts a connection ID. | ||
This allows it to be used in addition to other handlers, and so allows a graceful/reversible transition from one logging system to another. This is particularly useful in situations that use Redis as a message broker, where additional infrastructure isn't desired. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
from __future__ import annotations | ||
|
||
import logging | ||
from unittest.mock import patch | ||
|
||
import pytest | ||
|
||
from airflow.models import DAG, DagRun, TaskInstance | ||
from airflow.operators.empty import EmptyOperator | ||
from airflow.providers.redis.log.redis_task_handler import RedisTaskHandler | ||
from airflow.utils.session import create_session | ||
from airflow.utils.state import State | ||
from airflow.utils.timezone import datetime | ||
from tests.test_utils.config import conf_vars | ||
|
||
|
||
class TestRedisTaskHandler: | ||
@pytest.fixture | ||
def ti(self): | ||
date = datetime(2020, 1, 1) | ||
dag = DAG(dag_id="dag_for_testing_redis_task_handler", start_date=date) | ||
task = EmptyOperator(task_id="task_for_testing_redis_log_handler", dag=dag) | ||
dag_run = DagRun(dag_id=dag.dag_id, execution_date=date, run_id="test", run_type="scheduled") | ||
|
||
with create_session() as session: | ||
session.add(dag_run) | ||
session.commit() | ||
session.refresh(dag_run) | ||
|
||
ti = TaskInstance(task=task, run_id=dag_run.run_id) | ||
ti.dag_run = dag_run | ||
ti.try_number = 1 | ||
ti.state = State.RUNNING | ||
|
||
yield ti | ||
|
||
with create_session() as session: | ||
session.query(DagRun).delete() | ||
|
||
@conf_vars({("logging", "remote_log_conn_id"): "redis_default"}) | ||
def test_write(self, ti): | ||
handler = RedisTaskHandler("any", max_lines=5, ttl_seconds=2) | ||
handler.set_context(ti) | ||
logger = logging.getLogger(__name__) | ||
logger.addHandler(handler) | ||
|
||
key = ( | ||
"dag_id=dag_for_testing_redis_task_handler/run_id=test" | ||
+ "/task_id=task_for_testing_redis_log_handler/attempt=1.log" | ||
) | ||
|
||
with patch("redis.Redis.pipeline") as pipeline: | ||
logger.info("Test log event") | ||
|
||
pipeline.return_value.rpush.assert_called_once_with(key, "Test log event") | ||
pipeline.return_value.ltrim.assert_called_once_with(key, start=-5, end=-1) | ||
pipeline.return_value.expire.assert_called_once_with(key, time=2) | ||
pipeline.return_value.execute.assert_called_once_with() | ||
|
||
@conf_vars({("logging", "remote_log_conn_id"): "redis_default"}) | ||
def test_read(self, ti): | ||
handler = RedisTaskHandler("any") | ||
handler.set_context(ti) | ||
logger = logging.getLogger(__name__) | ||
logger.addHandler(handler) | ||
|
||
key = ( | ||
"dag_id=dag_for_testing_redis_task_handler/run_id=test" | ||
+ "/task_id=task_for_testing_redis_log_handler/attempt=1.log" | ||
) | ||
|
||
with patch("redis.Redis.lrange") as lrange: | ||
lrange.return_value = [b"Line 1", b"Line 2"] | ||
logs = handler.read(ti) | ||
|
||
assert logs == ([[("", "Line 1\nLine 2")]], [{"end_of_log": True}]) | ||
lrange.assert_called_once_with(key, start=0, end=-1) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change look out of place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is true - but it allows the lower level handler,
_RedisHandler
to not inherit fromlogging.FileHandler
, which I think is appropriate since there is no file involved(?)It looks like the codebase already doesn't enforce that the lower level handler extends from
logging.FileHandler
-airflow/airflow/providers/amazon/aws/log/cloudwatch_task_handler.py
Line 67 in 2940b9f
This touches on the bits that I'm unsure about - it looks like
FileTaskHandler
has a lot of file-related things, but also it looks like it's expected to extend from it when not logging to a file. Not quite sure how to tackle thatThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should have been more specific initially. :)
What I meant was that maybe this change is not required by this PR? since you have anyway overridden the
self.handler
in class RedisTaskHandler. Also,FileTaskHandler
is used in many places, and changing it here might affect those implementations.About inheriting from
FileTaskHandler
I'm not sure either, maybe @bolkedebruin might have more context.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah - so if I change it back to how it was before this PR, I get a type error:
So something else would have to change too. The easiest way that I've come up with is to de-facto disable type checking on
self.handler
by removing type annotations in a few places, to make it more like the Cloudwatch handler, but that seems a bit... cheat-y._RedisHandler
could really inherit fromlogging.FileHandler
to satisfy type checking, but its constructor seems to require a path to a file, which seems odd in this case.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is OK to me since the base class only really needs to inner handler to be a
logging.Handler
. It could be moved to a separate PR but since this is where the change becomes necessary (instead of just a generally good thing to do) it’s fine to be included here.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this specific change is not tightly coupled to this PR I would suggest to do it in a separated PR.
this is because utils/log goes to core release cycle while the other files goes to provider release cycle.
@michalc Is it possible to separate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michalc can you check my above comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal Sounds reasonable to separate to another PR. Will do that now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eladkal Done in #32773