Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple buttons from operator_extra_links #28870

Open
1 of 2 tasks
travis-cook-sfdc opened this issue Jan 12, 2023 · 16 comments
Open
1 of 2 tasks

Support multiple buttons from operator_extra_links #28870

travis-cook-sfdc opened this issue Jan 12, 2023 · 16 comments
Labels

Comments

@travis-cook-sfdc
Copy link

travis-cook-sfdc commented Jan 12, 2023

Description

UPDATE (@potiuk): Seems that better idea is to add support for dynamic collections of links to the operator rather than extend link to support multiple of those.

BaseOperatorLink should support returning multiple links:

class ButtonLink:
    def __init__(self, name: str, link: str) -> None:
        self.name = name
        self.link = link


class BaseOperatorLink(metadataclass=ABCMeta):
    ...
    @abstractmethod
    def get_links(self, operator: BaseOperator, dttm: datetime) -> List[ButtonLink]:
        pass

Use case/motivation

My company has implemented a "MultiExternalTaskSensor" that allows a single task to wait for multiple other tasks to complete.

ExternalTaskSensor automatically supports operator_extra_links to provide a link to the upstream DAG page, but this is impossible to do with Multi... since there's an arbitrary number of buttons that need to show up based on the number of tasks that are being waited on.

Another benefit of allowing this would be that the buttons could support dynamic names, which would allow information about the task (like it's state) to present in the button text.

This would add an additional challenge while the button data was loading, since the name might not be known. At this point, there could either be a default name fallback or a simple loading spinner while we wait for all button names to return.

There would need to be changes to the /extra_links API and dag.js, but it seems feasible.

It could be implement along the lines of:

class ButtonLink:

    def __init__(self, name: str, link: str) -> None:
        self.name = name
        self.link = link

class BaseOperatorLink(metaclass=ABCMeta):
    ...

    @abstractmethod
    def get_links(self, operator: BaseOperator, dttm: datetime) -> List[ButtonLink]:
        pass

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@travis-cook-sfdc travis-cook-sfdc added the kind:feature Feature Requests label Jan 12, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 12, 2023

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk
Copy link
Member

potiuk commented Jan 12, 2023

As long as it backwards compatible - sure. If you would like to contribute it, that works ld be great - it does not pass the bar of being super simple but for experienced person it might be interesting one to learn how things work. Marking as good first issue.

@uranusjr
Copy link
Member

How is this different from implementing multiple OperatorLink classes for the operator?

@potiuk
Copy link
Member

potiuk commented Jan 12, 2023

How is this different from implementing multiple OperatorLink classes for the operator?

Ah ... I completely forgot that we can do that.... I guess we can close that one then - if this does not work for you @travis-cook-sfdc , I will close it. We can re-open if you think that using multiple links is not good and have some good justification.

@potiuk potiuk closed this as completed Jan 12, 2023
@travis-cook-sfdc
Copy link
Author

This is different than implementing multiple OperatorLinks for a few reasons:

  • The number of links required is unknown at instantiation. It's dependent on the instance of the operator and not the generic operator.
  • The button name is hardcoded by the link, but in this case, you'd want it to be dynamic based on the task name.

Imagine this operator:

class ExternalTask:
    dag_id: str
    task_id: str

class MultiExternalTaskSensor(BaseSensorOperator):
    def __init__(external_tasks: List[ExternalTask], ...):
         ....

And then the following dags:


dag_1 = Dag(...)

wf_upstreams = MultiExternalTaskSensor(
    task_id="wf_upstreams",
    dag=dag_1,
    external_tasks=[
        ExternalTask(dag_id="upstream_foo", task_id="bar"),
        ExternalTask(dag_id="upstream_bat", task_id="baz")
    ]
)

...

dag_2 = Dag(...)

wf_upstreams = MultiExternalTaskSensor(
    task_id="wf_upstreams",
    dag=dag_2,
    external_tasks=[
        ExternalTask(dag_id="upstream_foo1", task_id="bar1"),
        ExternalTask(dag_id="upstream_foo2", task_id="bar2"),
        ExternalTask(dag_id="upstream_foo3", task_id="bar3"),
    ]
)

When you open the modal for dag_1.wf_upstreams, you should 2 buttons with the name upstream_foo and upstream_bar.

When you open the modal for dag_2.wf_upstreams, you should see 3 buttons with the names upstream_foo1, upstream_foo2, and upstream_foo3.

AFAIK, BaseOperatorLink doesn't allow you to set a dynamic number of links based on a task instance attribute nor does it allow you to set dynamic button names. I'd be delighted to be wrong though!

@Taragolis
Copy link
Contributor

@attr.s(auto_attribs=True)
class BigQueryConsoleIndexableLink(BaseOperatorLink):
"""Helper class for constructing BigQuery link."""
index: int = attr.ib()
@property
def name(self) -> str:
return f"BigQuery Console #{self.index + 1}"
def get_link(
self,
operator: BaseOperator,
*,
ti_key: TaskInstanceKey,
):
job_ids = XCom.get_value(key="job_id", ti_key=ti_key)
if not job_ids:
return None
if len(job_ids) < self.index:
return None
job_id = job_ids[self.index]
return BIGQUERY_JOB_DETAILS_LINK_FMT.format(job_id=job_id)

@uranusjr
Copy link
Member

We can probably add something like

class MyOperator(BaseOperator):
    operator_extra_links = MyOperatorExtraLinkCollection()  # Not a list of links!

to encapsulate this pattern. Probably makes more sense than having a BaseOperatorLink return multiple links.

@potiuk potiuk reopened this Jan 18, 2023
@potiuk
Copy link
Member

potiuk commented Jan 18, 2023

nice discussion. I see the problem now, and yeah, I agree having support for dynamic collection of links in operator is a better idea

@potiuk
Copy link
Member

potiuk commented Jan 18, 2023

Updated the description to reflect this

@travis-cook-sfdc
Copy link
Author

@Taragolis - Interesting example. I thought I had tried something similar, but I ran into problems when I attempted to register subclass of BaseOperatorLink as an Airflow plugin since I thought I needed a known cardinality of links. Is the automatic index the magic that makes this work?

@travis-cook-sfdc
Copy link
Author

After a little more investigation, I think that example only works because the extra link is registered as a provider which doesn't require explicit instantiation. If you want to register it as a plugin, i.e.:

class MultiLinksPlugin(AirflowPlugin):
    name = "extra_link_plugin"
    operator_extra_links = [
        BigQueryConsoleIndexableLink(0),
    ]

You have to specify an index which forces an explicit number of buttons at a time when it's unknown.

@travis-cook-sfdc
Copy link
Author

nice discussion. I see the problem now, and yeah, I agree having support for dynamic collection of links in operator is a better idea

@potiuk - It does seem like operator_extra_links in theory would support a dynamic collection of links.

https://github.com/apache/airflow/blob/main/airflow/models/baseoperator.py#L634

Assuming you write operator_extra_links in your operator class as

@property
def operator_extra_links(self, ...):
    ...
    return (x, y, ...)

I think the issue has to do with how those extra links are registered as plugins but perhaps I'm not understanding how this could be implemented correctly?

@uranusjr
Copy link
Member

uranusjr commented Jan 19, 2023

Using property is not supported because internally Airflow expects an iterable. Although this gives me an idea, we may be able to detect operator_extra_links is a property and handle that, instead of needing an extra class.

Note that these are (roughly) equivalent in Python:

@property
def operator_extra_links(self, ...): ...

# and

def _get_operator_extra_links(...): ...

operator_extra_links = property(_get_operator_extra_links)

and the latter variant is very similar to instantiating an OperatorExtraLinkCollection syntax proposed above.

@travis-cook-sfdc
Copy link
Author

I'm interesting in potentially implementing this change, but it's not clear to me exactly what the implementation should look like.

Can anyone provide a summary?

@uranusjr
Copy link
Member

Assuming we go with the property approach, the first blocker would be operator serialisation; currently we simply assume operator_extra_links is a class-level list:

@classmethod
def _serialize_operator_extra_links(cls, operator_extra_links: Iterable[BaseOperatorLink]):
"""
Serialize Operator Links.
Store the import path of the OperatorLink and the arguments passed to it.
For example:
``[{'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink': {}}]``
:param operator_extra_links: Operator Link
:return: Serialized Operator Link
"""
serialize_operator_extra_links = []
for operator_extra_link in operator_extra_links:
op_link_arguments = cattr.unstructure(operator_extra_link)
if not isinstance(op_link_arguments, dict):
op_link_arguments = {}
module_path = (
f"{operator_extra_link.__class__.__module__}.{operator_extra_link.__class__.__name__}"
)
serialize_operator_extra_links.append({module_path: op_link_arguments})
return serialize_operator_extra_links

Here is a report where a property object crashes on serialisation: #25243

Everything else should be easy enough if we can fix this.

@eladkal
Copy link
Contributor

eladkal commented Mar 15, 2024

Noting that #25243 is resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants