-
Notifications
You must be signed in to change notification settings - Fork 14.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make pyodbc.Row and databricks.Row JSON-serializable via new make_serializable
method
#32319
Make pyodbc.Row and databricks.Row JSON-serializable via new make_serializable
method
#32319
Conversation
fb0dc89
to
e83d5c2
Compare
Actually - there is no need to make it backwards-compatible. We could make it a breaking change for ODBC provider and bump major version - and if you make it a named tuple with same attributes as Row, this will be "not too much breaking" change - it will mostly work for all the current users. Also adding make_serializable in common.sql in this context is not needed, and it's actually very good, because otherwise you would have to add dependency on newer version of common.sql to make it works. |
I quickly wrapped up an update to this PR. It's not complete yet. Let me suggest this:
I admit I have never tried to json.dumps a namedtuple (will give a try on Airflow tomorrow). If that works, could we expand the context of this PR to Databricks - making the transformation to serializable objects at Hook level + solving the consequences of that change in the DatabricksSqlOperator ? It's doable with a simple tuple + description, and it's even more easier with namedtuple. I'm suggesting that because I like the solution of adding a serialization method that other hooks can override. Like this, the right tool is added at the right place. And can be used for both ODBC and Databricks. Otherwise, doing this change without touching the common.sql hook feels like hacking and working around the run method (and it add a new flavor of this method in the codebase). What about a PR for ODBC and Databricks, with the serializable transformation beind a flag, which will depend on a newer version of the common.sql package ? So that it's clean and doesn't break too much things. |
If we do it like this, then we should add "additional-dependencies" in provider.yaml and add common.sql >= NEXT_COMMON_SQL_VERSION and bump the common.sql version to next MINOr version in their provider.yaml. This would be effectively a new feature of common.sql that those two providers would depend on so making a new MINOR (i.e. feature) release of common.sql and make those two provider depends on it is the only way to do it. It's possible but a little dangerous because all other providers that depend on common.sql could start relying on this feature (i.e. add their own make_serializable) implementation in the futre, without adding the >= - so the only "future-proof" way of adding this would also be to add a pre-commit that will enforce that if a provider uses So yeah. It's possible to do it the way you propose - but will require a bit of build/CI overhead to make it robust and prevent accidental mistakes in the future. |
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
aa8dbd1
to
e321923
Compare
make_serializable
method
4dec7e5
to
27acf50
Compare
PR is ready for review. Currently:
I also added a static checks: It raises an error when a subclass of DbApiHook overrides |
Instantiating NamedTuple dynamically is not supported by mypy. python/mypy#848
70285a4
to
6b3e2d8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really cool. It also seems to be fully backwards-compatible . @utkarsharma2 and @Lee-W - WDYT?
Agree! Just read through the latest change again. This is awesome. Make the code much cleaner. I also think it's fully backwards-compatible. |
I missed this - why was the non standard This also could have been implemented as a custom serializer and I am really inclined to have this reverted. |
I'm whiling to create a new PR and pause/drop this one. But, during the implementation of this PR, I could not create/instantiate pyobc.Row objects (which is a C++ object) directly in Python - wanted to do it for the unittest-. Thus I assumed it is not possible... (but I never had this case, and maybe there's something to learn here !) Quoting you:
Assuming creating a deserializer which returns a pyodbc.Row object is not possible, is it okay to return a Namedtuple ? |
I do not think it has anything to do with "standard" behaviour. This is an internal implementation of DBHooks - each DB hooks might make their own decision on how to make(already commonly-structured- via common.sql) the rows returned as serializable variant. Could you please elaborate what do you mean by "missing versioning" and how you want to implement what it does? It is nothing that Airflow's serialization should be concerned about - this is really "standardising" behaviour of the DBAPI implementation - Python's DBAPI does not have very "strong" guarantees about what is returned, and some of the implementation (like Databricks) chose to implement to return non-serializable objects while most of the other DBAPI implememtations chose to use Tuples of rows + Tuples of descriptions which are serializable. The big problem we were trying to solve with Common.sql is to introduce common interface of what alll DBHooks will be returning. In this case it's really not even something that IMHO "airflow" serialization should be concerned about (if this is what you are after). This is purely one-way serializing - we just want to make sure that whatever gets returned via DBAPI calls (and essentially via Hook) is: a) standard according to what our DBHook should return (i.e. following the tuple structure + tuples of metadata describing the rows) In this sense - we do not need anything else - like versioning - we just need to find a way to make what gets returned can be returned and sent via XCom. But maybe I misunderstand what you want to achieve @bolkedebruin ? Could you please show an example of what you want to get and how what we want to achieve by that one is done? |
Hey @Joffreybvn thanks for responding and apologies for being late to the game and being a bit rough. It isn't required to return the same during deserialization as is provided through serialization. In other words it is fine if you serialize a @potiuk All XCom serialization and deserialization goes through
By dropping a serializer/deserializer into serializers for BUT if you intention is to have a common "flattened" format, that by specification is serializable by the backend (not just JSON btw), then a renaming of the method might make sense. Then maybe the solution is then renaming the method and changing the description of the method from airflow.serialization.serde import U
_make_serializable(result: Any) -> U: Which reads as the I realize that I might be overzealous and misreading due to all the 'serialize' here, but I did not see any consideration going into implementing this in the framework that is available for this ( |
For the reason described above - comon "airflow standard" returned output of DBApiHook. All the DBApiHook implementations return some kind of tuple-like output. Some of them also return some kind of description of metadata - sometimes embedded in the returned Row data as extra row content, sometimes not (there is no common standard for that). And the `make_serializable" returns a standard output that is, well, serializable, in case the originally returned objects are well, not serializable.... Yes - maybe the name could be better but - Naming is the hardest thing in computer science right next to cache invalidation. I think at this stage holding release of several providers and comon.sql is quite a bit too much to hold the release for that. Do you think it is worth it to make extra overhead for the release process? What would be a better name in this case? |
Also -I think really you are mixing the problem here.... It's not serialize/deserialize we are talking about here. And IMHO the name very well reflects the intention. We are not trying to SERIALIZE/DESERIALIZE things... We are trying to return the value from DBApiHook that can actually be directly serialized. So "make_serializable" is a very good name (and intent) it makes whatever gets returned by external thing a common, serializable format.. Pretty good and matches the intention. |
You are converting from one format to another. This is effectively serialization. This is why serde calls to
The question again is, why is "common format" required? Every DbAPI hook that has its own non-serializable format will need to include its own in other words I currently see the common format as an unnecessary intermediate format for serialization, which complicates matters as it loses versioning and typing information. If there is a use outside serialization of the common format then I can be convinced otherwise.
To certain extend, yes, but the signature of the function does not match its intent.
Why is this needed and where is this exposed - outside of serialization? If it is only for serialization and the default implementation is to return the input, then imho it should be solved as a serializer. Can we be more specific of what
This is what custom serializers do. Currently, the implementation here does not enforce that (
:-)
I disagree. This is going to be in for a long time, lets make sure we get the design right.
The name is fine although I could argue for |
We have absolutely no intention of doing it. This is one way DBAPi -> returned value conversion. We have absolutely no need to add complexity by having (effectively) all DBApi providers to depend on
Yes there is a very good reason we have it. It's nainly because of open-lineage integration (even if not implemented now then it's the way for it to be used in the fufure). Not everything revolves around serialization in Airflow. For DBApiHook, serialization is just after-thought. which is applied much later in the process. Serialization does not make a "standard" description of the returned data. Serialization looses semantic meaning of the returned data, where DBApIHook returns also meta-data that could be used to make column lineage semantically meaningful. Serialization is a lower-layer constructs that looses semantic meaning of the data returned by the Hook. This was the original intention of implementing common.sql and standardising what Hook returns. It's a building block on which future task-flow implementation of Open Lineage might be built for SQL data processing.
Yes. if you think the whole purpose of the interface is to serialize things. In our case we return DBApi-commong form of data that has merely a "property" of being serializable. It's an afterthought. merely a property to make it possible to use the outpit directly as operator output so that it can be stored in Xcom.
I think we have it right the DBApi, it's purpose and reasoning have been discussed there for about 1.5 year and went through several interations. Again
To avoid unnecessary coupling - if I can think of a single good reason. Our providers have deliberately wide range of supported Airflow versions they should be compatible with. We base a lot of assumptions on it. We cannot assume that we have specific version of Airflow available for us. Using serde and implementing it now in common.sql provider would mean that we will only be able to use it for Airlfow 2.8 that would support it - and possibly even rely on some internal stuff in serde - it woudl requre a set of unit tests testing compatibillity of various provider versions with various airflow version. This is a very bad idea to couple the two when all that we are talking about is to convert "specific" objects returned by Databricks to "serde" module that serializes it, relyiing - possibly - on specific serde version in specific airflow version. That would effectively make serde API and behaviour a Public API of Airflow that ALL DBHook providers should depend on - this coupling is not only unnecessary but also harmful and will slow us down. SERDE is not (and please correct me if I am wrong) not part of Public API of airflow. Are we willing to make 3rd-party providers depend on Serde? What APIs? Are they already public in https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html ? Because what you are really asking for is to make 3rd-party providers that would like to implement DBApiHook depend on it. I personally think it's a bad idea to add this coupling. Now that I explained why not. I have one question that maybe you can explain Could you please explain what benefit implementing serde for this conversion would bring? I am looking for some good reasons why and, to be honest cannot find it. But since I explained why not, I'd love to hear why. |
While I think that extra metadata can be beneficial, it is not part of the spec at the moment. The odbc implementation does include it, the databricks hook doesn't. There is btw no reason that serialization by the standard framework can't do this.
Well, I happy that you explain this, because that I could not get from the initial issue and the commits here and the comments (even when re-reading it now). The PR start with "Thus, I propose to add method in the DBApiHook to make the result of a query serializable". So yes, I read it as "do serialization". Sorry for that.
As mentioned above indeed the PR read this way.
Nitpicking: I don't see any reference to that discussion here. And maybe I was therefore put into the wrong direction.
The coupling wouldn't be explicit. A new Airflow release, yes that would be required at the moment (there is some thought of moving serializers to their respective providers). No imports, no changes required to the provider code though. Less unit-tests actually or the same.
Thanks for the consideration. Good point. You are correct that it is not in the public API, although you could you argue that the format of serialization is and being able to serialize/deserialize by definition also. Nevertheless, there is no direct use of the serde API required. So maybe this point is kind of mood?
While with the above points I am convinced the implementation is okay, for the sake of completeness and assuming no need for intermediate format (lineage):
def serialize(o: object) -> tuple[U, str, int, bool]:
import pyodbc
o = cast(pyodbc.Row, o)
columns: list[tuple[str, type]] = [col[:2] for col in o.cursor_description]
row_object = NamedTuple("Row", columns) # type: ignore[misc]
row = row_object(*o)
return row, qualname(o), __version__, True
def deserialize(classname: str, version: int, data: object) -> Any:
import pyodbc
if version > __version__:
raise TypeError("serialized version is newer than class version")
if classname == qualname(pyodbc.Row):
return data
raise TypeError(f"do not know how to deserialize {classname}") And that is all there is to it - apart from tests. No need to adjust the providers, but yes a Airflow update would be required. @Joffreybvn sorry for the fuss! Good work. |
def _make_serializable(result): | ||
"""Transform the databricks Row objects into a JSON-serializable list of rows.""" | ||
if result is not None: | ||
return [list(row) for row in result] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Joffreybvn I feel If the handler callable returns non-iterable then this would fail.
I call the run method like below and it's breaking my DAG
run( f"SELECT COUNT(*) FROM {table}", handler=lambda x: x.fetchone())
Error
packages/airflow/providers/databricks/hooks/databricks_sql.py", line 247, in <listcomp>
return [list(row) for row in result]
TypeError: 'int' object is not iterable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Unfortunately - the DBAPIHook for it's backwards-compatibility reasons is pretty difficult to handle all cases - including the case where we either return list of results or one result only.
I tried to explain it in https://github.com/apache/airflow/blob/main/airflow/providers/common/sql/doc/adr/0002-return-common-data-structure-from-dbapihook-derived-hooks.md#decision (including all the possible variants of returned values) and I think our handler
definition is not precise enough to explain that this can happen (we have not been as much into typing as we were back then).
@Joffreybvn -> I think this means we have to hold -on with databricks/pyodbc implementation to add support for those cases. Will you have time to fix it quickly ?
The ODBCHook returns pyodbc.Row objects when used with SQLExecuteQueryOperator to do SELECT queries, which cause serialization errors in the XCom backend. This PR follows the discussion started here.
A good place to implement the transformation of Row into tuples is after the execution of the handler, in the
run()
method. There, the raw data structure is available. Doing it later would imply to deal with potential nested structure (ex: list of results).Thus, I propose to add method in the DBApiHook to make the result of a query serializable. So that subclasses can override it to implement their own logic.
I see that many Hooks based on
DbApiHook
have a custom run method. But considering @potiuk comment about "the most standard the better", I propose to go for an extra internal method that Hooks can override, rather than copying the full run() method just to implement a small change. Considering also that the pyodbc.Row may not be the only case where a custom object causes issues (Databricks could be rewrote to handle that issue at Hook level). And considering that this pattern already exists with theserialize_cell
method which get overidden by child hooks.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rst
or{issue_number}.significant.rst
, in newsfragments.