Replies: 2 comments 2 replies
-
I finally managed to make it work, the question now is would this new operator be accepted into Airflow? The operators/triggers/hook are fully tested, below a screenshot from Sonarcube with the coverage. I've tried to implement it as clean as possible following the clean code principles, even though I'm sure there is still room for improvement. I've also added docstrings to the classes explaining the parameters. Bellow a snippit on how the Async MS Graph Operator can be used:
A you can see, the first operator named users_delta_task in the test_msgraph_sdk DAG invokes an expression which returns all the delta user pages. Each paged result is directly transfered to the dag 'process_users_delta' as we specified the trigger_dag_id argument which means events don't get accumulated within the operator until it completes which is memory efficient. If on the other hand you don't want to trigger another DAG to process each paged result, but you want to be able to get the processed events, you can set the keep_events parameter to True (default is False) which will keep track of each processed event (e.g. paged result) and once the operator is finished processing, it will return all those events. The later one will of course consume more memory but allows you to work in the "classic way" and can be handy if you now that there won't be too much data. There is also an example on how to get a sharepoint site via the url and also how to get the pages related to that sharepoint site. Same parameters (expression, conn_id and trigger_dag_id) of the operator are templated which means as you can see in example above that you can use jinja expressions. |
Beta Was this translation helpful? Give feedback.
-
We now have published the provider on pypi: https://pypi.org/project/apache-airflow-providers-msgraph/ |
Beta Was this translation helpful? Give feedback.
-
Hello, I'm building a custom async MSGraphSDKAsyncOperator which uses the official python msgraph-sdk-python library from Microsoft. I've implemented a hook, a trigger and an operator. I want the operator to work asynchronously because the msgraph-sdk-python is also implemented in an async way, which is in my opinion a good thing but makes things a bit harder.
I've succeeded in the implementation, and I hope once finished I will be able to contribute this operator to Airflow, but I have one issue which I don't understand. When doing calls with the client, it is possible that you receive a paged response, so my idea was that when it is the case, the trigger would automatically call the next pages until it's done. The trigger is being deferred from the operator.
Below a snippet of the trigger run method how I implemented it:
As you can see I implemented an AsyncIterator named ODataIterator which handles the paged response and calls the next one on each async iteration. I've tested the ODataIterator and it works fine, the problem is that after the second iteration the trigger is being stopped due to an asyncio.CancelledError, and I don't understand why as no exception is being throw and no timeout has been specified so that could also not be the case. When I run the same method shown above outside Airflow, it completes succesfully and goes through all the paged responses (it takes a while to complete as there are a lot of paged responses). This is the code in Airflow where the exception happens in the triggerer_job_runner.py module:
As you can see above, when an asyncio.CancelledError occurs, the exception is being raised and of course the triggers is stopped. I've skimmed through the source code of the operators provided with Airflow in hope to see if there where other examples where multiple events are being fired through an AsyncIterator but I couldn't find any. Maybe what I'm trying here isn't also the good way to do it in Airflow. I've also tried an alternative approach by deferring each succesive paged call which actually worked but then I wonder what the purpose of the AsyncIterator is for events if you're only allowed to trigger at most 2 events?
To write my async operator, I got inspired by the following article on betterprogramming.
Below the integration tests which runs the code and succesfully goes through all 11 pages:
Beta Was this translation helpful? Give feedback.
All reactions