Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic usage of an executor or alike (Pool, Connection, Transaction) that allows to use the top level transaction when requesting a transaction #351

Closed
elertan opened this issue May 30, 2020 · 12 comments

Comments

@elertan
Copy link

elertan commented May 30, 2020

I am trying to create functions that have some functionality, that needs to fetch or mutate the database based on some logic. Sometimes this requires a transaction, sometimes it just needs a connection to fetch some data with. This works great by just passing in connection pool.
But what do I do when I want to nest these functions? Here's a simple (maybe non practical) POC.

async fn log(message: &str, pool: Pool<PgConnection>) -> Result<_, _> {
     let mut tx = pool.begin().await?;
     sqlx::query!("INSERT INTO tbl_log (message) VALUES ($1);", message).await?;
     tx.commit().await?;
     Ok(())
}

async fn create_user(name: &str, pool: Pool<PgConnection>) -> Result<_, _> {
     let mut tx = pool.begin().await?;
     log("Creating user...", &mut tx).await?;
     sqlx::query!("INSERT INTO tbl_user (name) VALUES ($1);", name).await?;
     log("Created user!", &mut tx).await?;
     tx.commit().await?;
     Ok(())
}

async fn main() -> Result<_, _> {
     let pool = ...;
     log("Starting app...", pool.clone()).await?;
     create_user("John Doe", pool.clone()).await?;
     Ok(())
}

So what I'd want to happen is:

  1. First log call gets invoked, requesting a new transaction
  2. Log call commits.
  3. Create user gets invoked, requesting a new transaction
  4. Log gets invoked, reusing transaction from the create user call
  5. Log gets invoked, reusing transaction from the create user call
  6. Create user commits, commit all log calls that were also executed on this transaction

I've looked into using a generic as such

fn foo<'a, E>(executor: E)
where E: Executor<Database = Postgres> + RefExecutor<'a, Database = Postgres> {
   ...
}

which does work for abstracting the pool and connection, but doesn't give the ability to begin a new transaction or continue with a current ongoing one.

Any ideas?

@elertan elertan changed the title Generic usage of an executor (Pool, Connection, Transaction) that allows to use the top level transaction when requesting a transaction Generic usage of an executor or alike (Pool, Connection, Transaction) that allows to use the top level transaction when requesting a transaction May 30, 2020
@AzureMarker
Copy link

It looks like you could use the Connection trait instead. That provides a begin method and has Executor as a super-trait.

@mehcode
Copy link
Member

mehcode commented May 31, 2020

Transactions can now be started from &mut Connection as opposed to requiring an owned connection.

This combined with the existing ability of transactions to derefence into connections should allow for this.

@elertan
Copy link
Author

elertan commented May 31, 2020

Does this solution have other downsides though? The docs suggest that using Pool over a sticky connection, of course a single connection would be required for transactional operations, but for simple data fetching, it would work with a pool as well?
I'll try this out to see if I can get this to work, thanks @Mcat12 @mehcode !

@elertan
Copy link
Author

elertan commented May 31, 2020

I'm still having issues passing around the &mut Connection.
I've made a sample project to show this issue:
https://github.com/elertan/sqlx-connection-parameter

This errors occurs when trying to use an inner transaction.

error[E0507]: cannot move out of `*conn` which is behind a mutable reference
  --> src/main.rs:31:18
   |
31 |     let mut tx = conn.begin().await?;
   |                  ^^^^ move occurs because `*conn` has type `C`, which does not implement the `Copy` trait

I've also had an error without the inner transaction, changing the create_user fn to this

async fn create_user<C>(name: &str, conn: &mut C) -> Result<(), sqlx::Error>
    where C: Connection<Database = Postgres> {
    log("Creating user...", conn).await?;

    sqlx::query("SELECT 1")
        .execute(conn)
        .await?;

    log("User created.", conn).await?;
    Ok(())
}

produces a move on execute

error[E0382]: borrow of moved value: `conn`
  --> src/main.rs:37:26
   |
29 | async fn create_user<C>(name: &str, conn: &mut C) -> Result<(), sqlx::Error>
   |                                     ---- move occurs because `conn` has type `&mut C`, which does not implement the `Copy` trait
...
34 |         .execute(conn)
   |                  ---- value moved here
...
37 |     log("User created.", conn).await?;
   |                          ^^^^ value borrowed here after move

@AzureMarker
Copy link

Transactions can now be started from &mut Connection as opposed to requiring an owned connection.

I can't find this in the docs or code, but it should solve the compilation problem.

@AzureMarker
Copy link

I've also had an error without the inner transaction

This error is due to Query::execute taking mut executor instead of &mut executor. However, it seems to only need the &mut reference because the underlying Executor::execute method only needs &mut. Perhaps it should take a reference instead @mehcode? There is a quick fix for now:

async fn create_user<C>(name: &str, conn: &mut C) -> Result<(), sqlx::Error>
where
    C: Connection<Database = Postgres>,
{
    log("Creating user...", conn).await?;

    conn.execute(sqlx::query("SELECT 1")).await?;

    log("User created.", conn).await?;
    Ok(())
}

@mehcode
Copy link
Member

mehcode commented May 31, 2020

Sorry I meant the change has been done on master. It's not yet released.

@mehcode
Copy link
Member

mehcode commented May 31, 2020

@elertan
Copy link
Author

elertan commented May 31, 2020

Hope it gets versioned soon then! I'll try the other api as well later @Mcat12 Thanks for the suggestion

@mehcode
Copy link
Member

mehcode commented Jun 1, 2020

Reading up in the thread,

error[E0382]: borrow of moved value: conn

This is because of the generic context. Rust doesn't understand that it can just reborrow the &mut. This can be fixed on nightly with GATs which we'll integrate when they hit stable. A work-around for now is to manually reborrow with:

    sqlx::query("SELECT 1")
        .execute(&mut *conn)
        .await?;

@elertan
Copy link
Author

elertan commented Jun 1, 2020

Reading up in the thread,

error[E0382]: borrow of moved value: conn

This is because of the generic context. Rust doesn't understand that it can just reborrow the &mut. This can be fixed on nightly with GATs which we'll integrate when they hit stable. A work-around for now is to manually reborrow with:

    sqlx::query("SELECT 1")
        .execute(&mut *conn)
        .await?;

Trying to use the manual reborrow still does not work for me, see the code snippet below:

pub async fn create_or_update_address<C>(
    params: &CreateOrUpdateAddressParams,
    conn: &mut C,
) -> Result<CreateOrUpdateAddressResult, ServiceError<CreateOrUpdateAddressError>>
where
    C: Connection<Database = Postgres>,
{
    params.validate().map_err(|err| err.into_service_error())?;

    let result = sqlx::query_as!(
        TblAddress,
        "
SELECT *
FROM tbl_address
WHERE country_id = $1 and
state_or_province_name = $2 and
city_name = $3 and
street_name = $4 and
house_number = $5 and
annex = $6 and
zip_code = $7
",
        params.country_id,
        params.state_or_province_name,
        params.city_name,
        params.street_name,
        params.house_number,
        params.annex,
        params.zip_code
    )
    .fetch_optional(&mut *conn)
    .await
    .map_err(|err| err.into_service_error())?;

    todo!()
}

produces error

error[E0277]: the trait bound `C: std::marker::Copy` is not satisfied
   --> src/services/mod.rs:40:21
    |
 40 |     .fetch_optional(&mut *conn)
    |                     ^^^^^^^^^^ the trait `std::marker::Copy` is not implemented for `C`
    |
 help: consider further restricting this bound with `+ std::marker::Copy`
   --> src/services/mod.rs:15:5
    |
 15 |     C: Connection<Database = Postgres>,
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: required because of the requirements on the impl of `sqlx_core::executor::RefExecutor<'_>` for `&mut C`

 error[E0277]: the trait bound `C: sqlx_core::executor::RefExecutor<'_>` is not satisfied
   --> src/services/mod.rs:40:21
    |
 40 |     .fetch_optional(&mut *conn)
    |                     ^^^^^^^^^^ the trait `sqlx_core::executor::RefExecutor<'_>` is not implemented for `C`
    |
 help: consider further restricting this bound with `+ sqlx_core::executor::RefExecutor<'_>`
   --> src/services/mod.rs:15:5
    |
 15 |     C: Connection<Database = Postgres>,
    |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    = note: required because of the requirements on the impl of `sqlx_core::executor::RefExecutor<'_>` for `&mut C`

As for @Mcat12 's suggested approach, which does work, but the Connection, or rather Executor does not provide methods such as fetch_one, fetch_optional and fetch_all, which are pretty essential imo. Having to stream the rows and transform them manually is a pain sadly.

@mehcode
Copy link
Member

mehcode commented Jun 13, 2020

This is all fixed on master so I'm going to close this. See #361 for tracking on the release.

@mehcode mehcode closed this as completed Jun 13, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants