Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make extra link work in UI #25500

Merged
merged 1 commit into from
Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@
FrozenSet,
Iterable,
List,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
Type,
Union,
cast,
)

from airflow.compat.functools import cached_property
Expand Down Expand Up @@ -284,6 +287,16 @@ def global_operator_extra_link_dict(self) -> Dict[str, Any]:
def extra_links(self) -> List[str]:
return list(set(self.operator_extra_link_dict).union(self.global_operator_extra_link_dict))

def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, "Session"]]) -> "BaseOperator":
"""Get the "normal" operator from current abstract operator.

MappedOperator uses this to unmap itself based on the map index. A non-
mapped operator (i.e. BaseOperator subclass) simply returns itself.

:meta private:
"""
raise NotImplementedError()

def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
"""For an operator, gets the URLs that the ``extra_links`` entry points to.

Expand All @@ -300,13 +313,13 @@ def get_extra_links(self, ti: "TaskInstance", link_name: str) -> Optional[str]:
link = self.global_operator_extra_link_dict.get(link_name)
if not link:
return None
# Check for old function signature

parameters = inspect.signature(link.get_link).parameters
args = [name for name, p in parameters.items() if p.kind != p.VAR_KEYWORD]
if "ti_key" in args:
return link.get_link(self, ti_key=ti.key)
else:
return link.get_link(self, ti.dag_run.logical_date) # type: ignore[misc]
old_signature = all(name != "ti_key" for name, p in parameters.items() if p.kind != p.VAR_KEYWORD)

if old_signature:
return link.get_link(self.unmap(None), ti.dag_run.logical_date) # type: ignore[misc]
return link.get_link(self.unmap(None), ti_key=ti.key)

def render_template_fields(
self,
Expand Down Expand Up @@ -401,8 +414,8 @@ def render_template(
template = jinja_env.from_string(value)
dag = self.get_dag()
if dag and dag.render_template_as_native_obj:
return render_template_as_native(template, context)
return render_template_to_string(template, context)
return render_template_as_native(template, cast(MutableMapping[str, Any], context))
return render_template_to_string(template, cast(MutableMapping[str, Any], context))
Comment on lines -404 to +418
Copy link
Member Author

@uranusjr uranusjr Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated, but Mypy is complaining. context here is implemented as a MutableMapping (although we supply a type stub to say it’s not to prevent users from getting a wrong idea), so this is fine.


if isinstance(value, (DagParam, XComArg)):
return value.resolve(context)
Expand Down
17 changes: 6 additions & 11 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1512,7 +1512,7 @@ def validate_mapped_arguments(cls, **kwargs: Any) -> None:
if cls.mapped_arguments_validated_by_init:
cls(**kwargs, _airflow_from_mapped=True, _airflow_mapped_validation_only=True)

def unmap(self, ctx: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
def unmap(self, resolve: Union[None, Dict[str, Any], Tuple[Context, Session]]) -> "BaseOperator":
""":meta private:"""
return self

Expand Down Expand Up @@ -1766,21 +1766,16 @@ class BaseOperatorLink(metaclass=ABCMeta):
@property
@abstractmethod
def name(self) -> str:
"""
Name of the link. This will be the button name on the task UI.

:return: link name
"""
"""Name of the link. This will be the button name on the task UI."""

@abstractmethod
def get_link(self, operator: AbstractOperator, *, ti_key: "TaskInstanceKey") -> str:
"""
Link to external system.
def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
"""Link to external system.

Note: The old signature of this function was ``(self, operator, dttm: datetime)``. That is still
supported at runtime but is deprecated.

:param operator: airflow operator
:param ti_key: TaskInstance ID to return link for
:param operator: The Airflow operator object this link is associated to.
:param ti_key: TaskInstance ID to return link for.
:return: link to external system
"""
8 changes: 1 addition & 7 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@


if TYPE_CHECKING:
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.taskinstance import TaskInstanceKey


Expand All @@ -51,12 +50,7 @@ class TriggerDagRunLink(BaseOperatorLink):

name = 'Triggered DAG'

def get_link(
self,
operator: "AbstractOperator",
*,
ti_key: "TaskInstanceKey",
) -> str:
def get_link(self, operator: BaseOperator, *, ti_key: "TaskInstanceKey") -> str:
# Fetch the correct execution date for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
Expand Down
3 changes: 1 addition & 2 deletions airflow/providers/qubole/operators/qubole.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
)

if TYPE_CHECKING:
from airflow.models.abstractoperator import AbstractOperator
from airflow.models.taskinstance import TaskInstanceKey
from airflow.utils.context import Context

Expand All @@ -43,7 +42,7 @@ class QDSLink(BaseOperatorLink):

def get_link(
self,
operator: "AbstractOperator",
operator: BaseOperator,
dttm: Optional[datetime] = None,
*,
ti_key: Optional["TaskInstanceKey"] = None,
Expand Down
8 changes: 6 additions & 2 deletions airflow/www/static/js/dag.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ let subdagId = '';
let dagRunId = '';
let mapIndex;
let mapStates = [];
let extraLinks;
const showExternalLogRedirect = getMetaValue('show_external_log_redirect') === 'True';

const buttons = Array.from(document.querySelectorAll('a[id^="btn_"][data-base-url]')).reduce((obj, elm) => {
Expand Down Expand Up @@ -144,7 +145,7 @@ document.addEventListener('click', (event) => {
export function callModal({
taskId: t,
executionDate: d,
extraLinks,
extraLinks: e,
tryNumber,
isSubDag,
dagRunId: drID,
Expand All @@ -160,6 +161,7 @@ export function callModal({
executionDate = d;
dagRunId = drID;
mapIndex = mi;
extraLinks = e;
if (isMapped) {
mapStates = mappedStates;
}
Expand Down Expand Up @@ -269,7 +271,7 @@ export function callModal({
}
query.delete('try_number');

if (extraLinks && extraLinks.length > 0) {
if (!isMapped && extraLinks && extraLinks.length > 0) {
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
const markupArr = [];
extraLinks.sort();
$.each(extraLinks, (i, link) => {
Expand Down Expand Up @@ -318,6 +320,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
taskId,
executionDate,
dagRunId,
extraLinks,
mapIndex: -1,
isMapped: true,
mappedStates: mapStates,
Expand All @@ -327,6 +330,7 @@ $(document).on('click', '.map_index_item', function mapItem() {
taskId,
executionDate,
dagRunId,
extraLinks,
mapIndex: mi,
});
}
Expand Down
14 changes: 8 additions & 6 deletions airflow/www/static/js/dag/details/taskInstance/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,14 @@ const TaskInstance = ({ taskId, runId }: Props) => {
</Box>
)}
<Details instance={instance} group={group} />
<ExtraLinks
taskId={taskId}
dagId={dagId || ''}
executionDate={executionDate}
extraLinks={group?.extraLinks || []}
/>
{!isMapped && (
<ExtraLinks
taskId={taskId}
dagId={dagId || ''}
executionDate={executionDate}
extraLinks={group?.extraLinks || []}
/>
)}
{isMapped && taskId && (
<MappedInstances
dagId={dagId}
Expand Down
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,7 @@ unittest
unittests
unix
unmanaged
unmap
unmappable
unmapped
unmapping
Expand Down