Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use base classes for AWS Lambda Operators/Sensors #34890

Merged
merged 3 commits into from
Oct 20, 2023

Conversation

Taragolis
Copy link
Contributor

@Taragolis Taragolis commented Oct 12, 2023

closes: #34781

Doc render samples

Generic Parameters
image

Configure botocore.config.Config
image


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@Taragolis Taragolis marked this pull request as ready for review October 12, 2023 19:03
Copy link
Member

@hussein-awala hussein-awala left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice one!

template_fields: Sequence[str] = (
"function_name",
"runtime",
"role",
"handler",
"code",
"config",
*AwsBaseOperator.template_fields,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than adding this to every operator class, can we do it the other way around where the base class has this field already with these defaults and we update it with anything new that the concrete class wants to add (if anything)? It would need to be a mutable type of course, I'm not sure if it being immutable is a requirement or not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could update it only in init method, at least I don't know another way to change class attribute during creation, and I'm not sure is it works on each cases or not (especially for mapped tasks).

Another way it is make parameters as set, but this have side effect, set doesn't preserve the order, and it might generate new serialised DAG each time it parsed (maybe it is not so bad)

class Base:
    templates_fields: set[str] = {"a", "b", "c"}

class Child1(Base):
    ...

class Child2(Base):
    templates_fields = Base.templates_fields | {"d", "e"}

class Child3(Base):
    ...

print(" Step 1 ".center(72, "="))
print(f"Base.templates_fields={Base.templates_fields}")
print(f"Child1.templates_fields={Child1.templates_fields}")
print(f"Child2.templates_fields={Child2.templates_fields} - the order would change from run to run")
print(f"Child3.templates_fields={Child3.templates_fields}")

print(" Step 2: Update one of the child ".center(72, "="))
Child3.templates_fields.update({"f", "g"})

print(f"Base.templates_fields={Base.templates_fields}")
print(f"Child1.templates_fields={Child1.templates_fields}")
print(f"Child2.templates_fields={Child2.templates_fields} - the order would change from run to run")
print(f"Child3.templates_fields={Child3.templates_fields}")

print(" Step 3: Invalid operation ".center(72, "="))
class Child5(Base):
    templates_fields.add("h")  # We can't do that
================================ Step 1 ================================
Base.templates_fields={'b', 'c', 'a'}
Child1.templates_fields={'b', 'c', 'a'}
Child2.templates_fields={'e', 'b', 'd', 'c', 'a'} - the order would change from run to run
Child3.templates_fields={'b', 'c', 'a'}
=================== Step 2: Update one of the child ====================
Base.templates_fields={'b', 'f', 'g', 'c', 'a'}
Child1.templates_fields={'b', 'f', 'g', 'c', 'a'}
Child2.templates_fields={'e', 'b', 'd', 'c', 'a'} - the order would change from run to run
Child3.templates_fields={'b', 'f', 'g', 'c', 'a'}
====================== Step 3: Invalid operation =======================
Traceback (most recent call last):
...
NameError: name 'templates_fields' is not defined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought about another option, create helper function for apply default parameters

from airflow.compat.functools import cache

@cache
def aws_template_fields(*templated_fields: str) -> tuple[str]:
    return tuple(sorted(list({"aws_conn_id", "region_name", "verify"} | set(templated_fields))))


class SomeOperator:
    template_fields: Sequence[str] = aws_template_fields(
        "function_name",
        "runtime",
        "role",
        "handler",
        "code",
        "config",
   )

WDYT?

Copy link
Contributor

@ferruzzi ferruzzi Oct 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A variation on that might be to have each operator define "op_template_fields" and have BaseOperator's template_fields return base+op? Basically:

Class BaseOperator:
    base_template_fields = ("aws_conn_id", "region_name", "verify")

    @cache
    def template_fields:
        return self.base_template_fields + self.operator_template_fields
        
        
Class MyOperator:
    op_template_fields =  ("foo", ''"bar")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also would be nice to test within the mapped task, otherwise we could bump into the same problem as we have in BatchOperator

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought about another option, create helper function for apply default parameters

This is certainly nicer! Less duplication and the defaults are complexity hidden from the users. It would be nice to have a fully backwards compatible approach, but I'm happy to settle on this one.

A variation on that might be to have each operator define "op_template_fields"

This would make the existing operators incompatible since the field name would need to change, but we need to make other changes to them to convert them to the base class approach, so maybe that's perfectly fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added this helper 28aef3b

docs/apache-airflow-providers-amazon/connections/aws.rst Outdated Show resolved Hide resolved
docs/apache-airflow-providers-amazon/operators/lambda.rst Outdated Show resolved Hide resolved
Co-authored-by: Niko Oliveira <onikolas@amazon.com>
@@ -152,7 +150,7 @@ def execute_complete(self, context: Context, event: dict[str, Any] | None = None
return event["function_arn"]


class LambdaInvokeFunctionOperator(BaseOperator):
class LambdaInvokeFunctionOperator(AwsBaseOperator[LambdaHook]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll admit this format for the inheritence is new to me and I am trying to read up on how it's doing what it does, but it appears that the value inside the square brackets here is always the same as the value being assigned to aws_hook_class within the Operator. I don't suppose there is a way to simplify that so you don't need both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In square brakets it is just typing.Generic, which help type annotate type of the hook property, all chain is

  1. Define TypeVar

AwsHookType = TypeVar("AwsHookType", bound=AwsGenericHook)

  1. Define Generic in class definition

class AwsBaseHookMixin(Generic[AwsHookType]):
"""Mixin class for AWS Operators, Sensors, etc.

  1. Use defined typevar as annotation

# Should be assigned in child class
aws_hook_class: type[AwsHookType]

@cached_property
@final
def hook(self) -> AwsHookType:
"""

In general class LambdaInvokeFunctionOperator(AwsBaseOperator[LambdaHook]): for help IDE, static checkers to understand that hook will return LambdaHook object (in our case we use only in one place), and in additional validate that aws_hook_class should be a subclass of LambdaHook

However information about generics doesn't available in runtime and we can't extract it from class definition, so we should assign actual class to aws_hook_class for construct hook in hook property

@@ -0,0 +1,68 @@
.. Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition

@Taragolis Taragolis merged commit a664372 into apache:main Oct 20, 2023
43 checks passed
@Taragolis Taragolis deleted the amazon-lambda-use-base branch October 20, 2023 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Airflow Operator LambdaInvokeFunctionOperator should provide option to overwrite read_timeout settings
6 participants