Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Azure Service Bus Queue Operators #24038

Merged
merged 1 commit into from
Jun 23, 2022

Conversation

bharanidharan14
Copy link
Contributor

@bharanidharan14 bharanidharan14 commented May 31, 2022

Implemented Azure Service Bus Queue based Operator's to create queue, send message to the queue and receive message(list of message or batch message) and delete queue in azure service

  • Added AzureServiceBusCreateQueueOperator
  • Added AzureServiceBusSendMessageOperator
  • Added AzureServiceBusReceiveMessageOperator
  • Added AzureServiceBusDeleteQueueOperator
  • Added Example DAG
  • Added Documentation
  • Added hooks and connection type in - provider yaml file
  • Added unit Test case, doc strings

cc: @kaxil @phanikumv

@bharanidharan14 bharanidharan14 changed the title Add Azure Service Bus Queue Operator's Implement Azure Service Bus Queue Operator's Jun 1, 2022
@bharanidharan14 bharanidharan14 marked this pull request as ready for review June 1, 2022 07:38
ashb
ashb previously requested changes Jun 1, 2022
airflow/providers/microsoft/azure/hooks/asb_message.py Outdated Show resolved Hide resolved
airflow/providers/microsoft/azure/hooks/asb_message.py Outdated Show resolved Hide resolved
airflow/providers/microsoft/azure/hooks/asb_message.py Outdated Show resolved Hide resolved
airflow/providers/microsoft/azure/hooks/asb_message.py Outdated Show resolved Hide resolved
@kaxil
Copy link
Member

kaxil commented Jun 21, 2022

Copy link
Contributor

@josh-fell josh-fell left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM and really close! Just some clarification on file naming and consistency with exceptions.

operations are enabled.
"""
if queue_name is None:
raise ValueError("Queue name cannot be None.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError("Queue name cannot be None.")
raise TypeError("Queue name cannot be None.")

Looks like there is a mix of TypeError and ValueError for the same exception. Let's stay consistent with Ash's initial feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving back to raise TypeError

:param queue_name: The name of the queue or a QueueProperties with name.
"""
if queue_name is None:
raise ValueError("Queue name cannot be None.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError("Queue name cannot be None.")
raise TypeError("Queue name cannot be None.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving back to raise TypeError

:param max_wait_time: Maximum time to wait in seconds for the first message to arrive.
"""
if queue_name is None:
raise ValueError("Queue name cannot be None.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError("Queue name cannot be None.")
raise TypeError("Queue name cannot be None.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving back to raise TypeError

self.dead_lettering_on_message_expiration,
self.enable_batched_operations,
)
self.log.info("Created Queue ", queue.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.log.info("Created Queue ", queue.name)
self.log.info("Created Queue %s", queue.name)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

@@ -0,0 +1,206 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal What do you think about this file being named asb.py instead?

@bharanidharan14 Unless you were thinking about there being some other Service Bus-related grouping of modules in the future that don't deal with messages, queues, etc.?

Copy link
Contributor Author

@bharanidharan14 bharanidharan14 Jun 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josh-fell there is a draft PR for Azure service bus subscription operators (ASBCreateSubscriptionOperator, ASBUpdateSubscriptionOperator, ASBReceiveSubscriptionMessageOperator, ASBDeleteSubscriptionOperator) which uses the same hooks/asb.py and for operator which I thought I can keep it under asb_subscription.py file

Copy link
Contributor

@eladkal eladkal Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it make sense to group all operators under asb.py I'm OK with it but we don't have to.
My thumb rule for this is - it should be easily discoverable for users and easy to use.

In AWS for example, we grouped the S3 operators together because it's likely that in a single DAG you will need more than 1 operator so importing each operator from different file is annoying.

So the question here: how distinct are queue operators from subscriptions and how likely they are to be used in a single DAG? That is a question only people who use it can answer. Either way - we can always change it with some community feedback.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message-sending functionality of a queue maps directly to a topic and its message-receiving functionality maps to a subscription, so there is a chance of user using the queue and subscription operators in the same DAG

https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-queues-topics-subscriptions

Authenticating to Azure Service Bus
------------------------------------

There are Multiple ways to authenticate and authorize access to Azure Service Bus resources:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
There are Multiple ways to authenticate and authorize access to Azure Service Bus resources:
There are multiple ways to authenticate and authorize access to Azure Service Bus resources:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed corrected with small letter


.. exampleinclude:: /../../tests/system/providers/microsoft/azure/example_service_bus_queue.py
:language: python
:dedent: 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:dedent: 0
:dedent: 4

Since the START markers are indented, we should dedent the rendered code snippet in the docs. Same comment applies to the other instances as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

corrected with :dedent: 4

@bharanidharan14 bharanidharan14 force-pushed the asb-queue-operators branch 3 times, most recently from 2c2a5b1 to 33d5797 Compare June 22, 2022 08:15
@bharanidharan14
Copy link
Contributor Author

@josh-fell I have addressed the review comments, please review it

@kaxil
Copy link
Member

kaxil commented Jun 22, 2022

@bharanidharan14 There are test failures:

==== Providers postgres: 3 failures ====
tests/providers/microsoft/azure/hooks/test_asb.py::TestAdminClientHook::test_create_queue_exception: TypeError: Queue name cannot be None.
tests/providers/microsoft/azure/hooks/test_asb.py::TestAdminClientHook::test_delete_queue_exception: TypeError: Queue name cannot be None.
tests/providers/microsoft/azure/hooks/test_asb.py::TestMessageHook::test_receive_message_exception: TypeError: Queue name cannot be None.

@bharanidharan14
Copy link
Contributor Author

@bharanidharan14 There are test failures:

==== Providers postgres: 3 failures ====
tests/providers/microsoft/azure/hooks/test_asb.py::TestAdminClientHook::test_create_queue_exception: TypeError: Queue name cannot be None.
tests/providers/microsoft/azure/hooks/test_asb.py::TestAdminClientHook::test_delete_queue_exception: TypeError: Queue name cannot be None.
tests/providers/microsoft/azure/hooks/test_asb.py::TestMessageHook::test_receive_message_exception: TypeError: Queue name cannot be None.

@kaxil fixed it and pushed it again. Thanks

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Jun 22, 2022
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@bharanidharan14 bharanidharan14 force-pushed the asb-queue-operators branch 3 times, most recently from b89f85e to 34833b8 Compare June 23, 2022 08:20
Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

- Rename file name
- changed the import and got all the sub class into same file to maintain the existing structure
- fixed the testcase

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

- Test case
- Doc string
- Removed airflow exception and added proper error type
- Removed try/expect block
- Renamed the azure service bus queue operator test file

Implement Azure Service Bus Queue Operator's

Add Azure Service Bus Queue Operator's

- Create Queue Operator
- Send message queue Operator
- Receive message queue Operator
- Delete queue Operator
- example DAG
- Added hooks and connection type in - provider yaml file
- Added unit Test case

Update example DAG

Add Operators documentation

- Fixed `connection_string`  to be part of `get_conn()` not as Class attr
- Fixed connection doc
- changed the start date in example DAG
- Added Documentation on the Azure service bus Operators

Add space for the doc string

Fix

- Fixed ValueError to TypeError
- Fixed log info
- corrected `:dedent: 4`

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

- Rename file name
- changed the import and got all the sub class into same file to maintain the existing structure
- fixed the testcase

Implement Azure Service Bus Queue Operator's

Implement Azure Service Bus Queue Operator's

- Test case
- Doc string
- Removed airflow exception and added proper error type
- Removed try/expect block
- Renamed the azure service bus queue operator test file

Implement Azure Service Bus Queue Operator's

Add Azure Service Bus Queue Operator's

- Create Queue Operator
- Send message queue Operator
- Receive message queue Operator
- Delete queue Operator
- example DAG
- Added hooks and connection type in - provider yaml file
- Added unit Test case

Update example DAG

Add Operators documentation

- Fixed `connection_string`  to be part of `get_conn()` not as Class attr
- Fixed connection doc
- changed the start date in example DAG
- Added Documentation on the Azure service bus Operators

Add space for the doc string

Fix

- Fixed ValueError to TypeError
- Fixed log info
- corrected `:dedent: 4`

Add log info

Fix Test case failure

Rename the azure_queue to asb
@kaxil kaxil merged commit 09f38ad into apache:main Jun 23, 2022
@kaxil kaxil deleted the asb-queue-operators branch June 23, 2022 15:52
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Aug 12, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) full tests needed We need to run full set of tests for this PR to merge kind:documentation provider:microsoft-azure Azure-related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants