Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improper isolation using async_transaction? #26

Closed
Parth opened this issue Jan 2, 2022 · 2 comments
Closed

Improper isolation using async_transaction? #26

Parth opened this issue Jan 2, 2022 · 2 comments

Comments

@Parth
Copy link

Parth commented Jan 2, 2022

I've spent the last 2 days diving into the details of redis connections and how they relate to transactions. I've been trying to figure out what the right way to setup an async-web-server that talks to redis would be. So I've learned about the various connection strategies available to me (bb8, deadpool, and MultiplexedConnections). While doing so I came across your comment, and after some testing have determined that this is only safe to use if the async_transaction! has exclusive ownership of the connection it's using.

In this project, however, I'm pretty sure using a ManagedConnection like this:

let redis_manager = match redis_client.get_tokio_connection_manager().await {
Ok(rm) => rm,
Err(err) => {
eprintln!("Failed to initialise Redis connection manager for {}: {}", redis_url, err);
std::process::exit(1);
}
};
debug!("Initialised Redis connection pool to {}", redis_url);
does not guarantee your various async execution contexts exclusive access over the underlying connection.

And so connections cloned by actix like such:

redis_conn_manager: redis_manager.clone(),
are just going to be sharing that same underlying connection.

maybe the red herring:

debug!("Initialised Redis connection pool to {}", redis_url);
ConnectionManager doesn't actually perform any pooling, it just exposes a MultiplexedConnection that re-connects automatically.

I hope I'm wrong about all this and I don't understand some detail regarding the way these connections are managed (or some detail about actix for that matter, I'm using warp). But I'm pretty sure what's happening under the hood is the following:

If we were getting actual transaction isolation, you would expect the following code to infinite loop:

use redis::{pipe, AsyncCommands, RedisResult};

macro_rules! async_transaction {
    ($conn:expr, $keys:expr, $body:expr) => {
        loop {
            redis::cmd("WATCH").arg($keys).query_async($conn).await?;

            if let Some(response) = $body {
                redis::cmd("UNWATCH").query_async($conn).await?;
                break response;
            }
        }
    };
}

#[tokio::main]
async fn main() -> RedisResult<()> {
    let mut conn1 = redis::Client::open("redis://127.0.0.1:6379")?
        .get_multiplexed_tokio_connection() // This is what connection manager uses under the hood
        .await?;

    conn1.set("hello", 2).await?;

    let (_,): (isize,) = async_transaction!(&mut conn1, &["hello"], {
        println!("outer closure runs");
        let value: i32 = conn1.get("hello").await?;
        println!("GET `hello`: {}", value);

        {
            let mut conn2 = conn1.clone(); // This is a second, concurrent request's connection cloned by actix.
            let (_,): (isize,) = async_transaction!(&mut (conn2.clone()), &["hello"], {
                println!("inner closure run");
                let value: i32 = conn2.get("hello").await?;
                println!("GET `hello`: {}", value);
                pipe().atomic()
                    .set("hello", value + 1).ignore()
                    .get("hello")
                    .query_async(&mut conn2)
                    .await? // un-watches the keys from the outer async_transaction
            });
        }

        pipe().atomic()
                    .set("hello", value + 1).ignore()
                    .get("hello")
                    .query_async(&mut conn1)
                    .await? // This should fail, and try again
    });

    let value: i32 = conn1.get("hello").await?;
    println!("value in redis: {}", value);

    Ok(())
}

But it completes with a final value of 3, demonstrating a race condition.

If however the inner async_transaction gets it's own connection:

- let mut conn2 = conn1.clone();
+ let mut conn2 = redis::Client::open("redis://127.0.0.1:6379")?
        .get_multiplexed_tokio_connection()
        .await?;

Then this code infinite loops as expected.

So my current take-away, for the types of applications we're building, is that we need to use some sort of connection pooling, and perhaps we'd want to hook into deadpools post-recycle hook, in order to be sure that we're not watching some keys from a previous connection that returned it to the pool without un-watching all it's keys.

I'd love to get your thoughts on the situation as the architecture of my application seems similar to yours.

@davechallis
Copy link
Owner

Yup, good spot, I think you're right.

I haven't had much time to look into it yet, but I remember that the StackExchange.Redis client deals with this, and handles transactions in multiplexed connections.

I haven't looked at the code yet, but might be worth seeing if they've got any useful approach to consider.

Otherwise, where transactions are involved, either some form of mutex (to avoid interspersed WATCHes) or use of connection pooling would work here. The latter is probably the simplest approach at first glance.

davechallis pushed a commit that referenced this issue Jan 7, 2022
…nnections instead of a connection manager, in order to avoid issues related to interspersed WATCH commands on the same thread. See #26.
davechallis added a commit that referenced this issue Jan 14, 2022
…nnections instead of a connection manager, in order to avoid issues related to interspersed WATCH commands on the same thread. See #26.
@davechallis
Copy link
Owner

Fixed in 43cd03f.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants