Skip to content

Commit

Permalink
Add redis sentinel support
Browse files Browse the repository at this point in the history
There are a few breaking changes made to support this.
Most notably, the an optional `SentinelConfig` struct
has been added to the RedisConfig object. It seems like
obfuscating the fields here might be a good idea, but
that's not how we've been rolling thus far with the
config objects.

Also, the `RedisConnection` trait has been redone
to take a `RedisConfig` instead of a DSN in order
to build the connection. Sentinel needs it owing
to the large number of potential config params needed
for Sentinel, and I think this is probably a better
approach anyway.

The base redis tests (well, most of them) have also
been updated to support testing against either
base Redis or Sentinel. The `rstest` crate has been
added in order to be able to parameterize the tests
sanely. We also use this crate for tests in `redis-rs`,
so I think it's a no-brainer to add here.
  • Loading branch information
jaymell committed Oct 3, 2024
1 parent a2817f2 commit 1feda18
Show file tree
Hide file tree
Showing 8 changed files with 300 additions and 40 deletions.
2 changes: 2 additions & 0 deletions omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde = { version = "1.0.196", features = ["derive"] }
tokio = { version = "1", features = ["macros"] }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
rstest = "0.23.0"

[features]
default = ["in_memory", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"]
Expand All @@ -49,6 +50,7 @@ rabbitmq = ["dep:futures-util", "dep:lapin"]
rabbitmq-with-message-ids = ["rabbitmq", "dep:svix-ksuid"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
redis_cluster = ["dep:async-trait", "redis", "redis/cluster-async"]
redis_sentinel = ["dep:async-trait", "redis", "redis/sentinel"]
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
beta = []
75 changes: 66 additions & 9 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use std::{

use bb8::ManageConnection;
pub use bb8_redis::RedisConnectionManager;
#[cfg(feature = "redis_sentinel")]
use redis::{sentinel::SentinelNodeConnectionInfo, ProtocolVersion, RedisConnectionInfo, TlsMode};
use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions};
use serde::Serialize;
use svix_ksuid::KsuidLike;
Expand All @@ -58,30 +60,65 @@ use crate::{
#[cfg(feature = "redis_cluster")]
mod cluster;
mod fallback;
#[cfg(feature = "redis_sentinel")]
mod sentinel;
mod streams;

#[cfg(feature = "redis_cluster")]
pub use cluster::RedisClusterConnectionManager;
#[cfg(feature = "redis_sentinel")]
pub use sentinel::RedisSentinelConnectionManager;

pub trait RedisConnection:
ManageConnection<
Connection: redis::aio::ConnectionLike + Send + Sync,
Error: std::error::Error + Send + Sync + 'static,
>
{
fn from_dsn(dsn: &str) -> Result<Self>;
fn from_config(config: &RedisConfig) -> Result<Self>;
}

impl RedisConnection for RedisConnectionManager {
fn from_dsn(dsn: &str) -> Result<Self> {
Self::new(dsn).map_err(QueueError::generic)
fn from_config(config: &RedisConfig) -> Result<Self> {
Self::new(config.dsn.as_str()).map_err(QueueError::generic)
}
}

#[cfg(feature = "redis_cluster")]
impl RedisConnection for RedisClusterConnectionManager {
fn from_dsn(dsn: &str) -> Result<Self> {
Self::new(dsn).map_err(QueueError::generic)
fn from_config(config: &RedisConfig) -> Result<Self> {
Self::new(config.dsn.as_str()).map_err(QueueError::generic)
}
}

#[cfg(feature = "redis_sentinel")]
impl RedisConnection for RedisSentinelConnectionManager {
fn from_config(config: &RedisConfig) -> Result<Self> {
let cfg = config
.sentinel_config
.clone()
.ok_or(QueueError::Unsupported("Missing sentinel configuration"))?;

let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
let protocol = if cfg.redis_use_resp3 {
ProtocolVersion::RESP3
} else {
ProtocolVersion::default()
};
RedisSentinelConnectionManager::new(
vec![config.dsn.as_str()],
cfg.service_name.clone(),
Some(SentinelNodeConnectionInfo {
tls_mode,
redis_connection_info: Some(RedisConnectionInfo {
db: cfg.redis_db.unwrap_or(0),
username: cfg.redis_username.clone(),
password: cfg.redis_password.clone(),
protocol,
}),
}),
)
.map_err(QueueError::generic)
}
}

Expand Down Expand Up @@ -233,6 +270,17 @@ pub struct RedisConfig {
pub payload_key: String,
pub ack_deadline_ms: i64,
pub dlq_config: Option<DeadLetterQueueConfig>,
pub sentinel_config: Option<SentinelConfig>,
}

#[derive(Clone)]
pub struct SentinelConfig {
pub service_name: String,
pub redis_tls_mode_secure: bool,
pub redis_db: Option<i64>,
pub redis_username: Option<String>,
pub redis_password: Option<String>,
pub redis_use_resp3: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -269,6 +317,12 @@ impl RedisBackend {
pub fn cluster_builder(config: RedisConfig) -> RedisClusterBackendBuilder {
RedisBackendBuilder::new(config)
}

#[cfg(feature = "redis_sentinel")]
/// Creates a new redis sentinel queue builder with the given configuration.
pub fn sentinel_builder(config: RedisConfig) -> RedisSentinelBackendBuilder {
RedisBackendBuilder::new(config)
}
}

#[allow(deprecated)]
Expand Down Expand Up @@ -305,8 +359,11 @@ pub struct RedisBackendBuilder<R = RedisConnectionManager, S = Static> {
#[cfg(feature = "redis_cluster")]
pub type RedisClusterBackendBuilder = RedisBackendBuilder<RedisClusterConnectionManager>;

#[cfg(feature = "redis_sentinel")]
pub type RedisSentinelBackendBuilder = RedisBackendBuilder<RedisSentinelConnectionManager>;

impl<R: RedisConnection> RedisBackendBuilder<R> {
fn new(config: RedisConfig) -> Self {
pub fn new(config: RedisConfig) -> Self {
Self {
config,
use_redis_streams: true,
Expand Down Expand Up @@ -372,7 +429,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_pair(self) -> Result<(RedisProducer<R>, RedisConsumer<R>)> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand Down Expand Up @@ -407,7 +464,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_producer(self) -> Result<RedisProducer<R>> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand All @@ -427,7 +484,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_consumer(self) -> Result<RedisConsumer<R>> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand Down
56 changes: 56 additions & 0 deletions omniqueue/src/backends/redis/sentinel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use async_trait::async_trait;
use redis::{
sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
ErrorKind, IntoConnectionInfo, RedisError,
};
use tokio::sync::Mutex;

// The mutex here is needed b/c there's currently
// no way to get connections in the redis sentinel client
// without a mutable reference to the underlying client.
struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);

/// ConnectionManager that implements `bb8::ManageConnection` and supports
/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
pub struct RedisSentinelConnectionManager {
client: LockedSentinelClient,
}

impl RedisSentinelConnectionManager {
pub fn new<T: IntoConnectionInfo>(
info: Vec<T>,
service_name: String,
node_connection_info: Option<SentinelNodeConnectionInfo>,
) -> Result<RedisSentinelConnectionManager, RedisError> {
Ok(RedisSentinelConnectionManager {
client: LockedSentinelClient(Mutex::new(SentinelClient::build(
info,
service_name,
node_connection_info,
SentinelServerType::Master,
)?)),
})
}
}

#[async_trait]
impl bb8::ManageConnection for RedisSentinelConnectionManager {
type Connection = redis::aio::MultiplexedConnection;
type Error = RedisError;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.client.0.lock().await.get_async_connection().await
}

async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
let pong: String = redis::cmd("PING").query_async(conn).await?;
match pong.as_str() {
"PONG" => Ok(()),
_ => Err((ErrorKind::ResponseError, "ping request").into()),
}
}

fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}
2 changes: 1 addition & 1 deletion omniqueue/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod azure_queue_storage;
mod gcp_pubsub;
#[cfg(feature = "rabbitmq")]
mod rabbitmq;
#[cfg(feature = "redis")]
#[cfg(any(feature = "redis", feature = "redis_sentinel"))]
mod redis;
#[cfg(feature = "redis_cluster")]
mod redis_cluster;
Expand Down
Loading

0 comments on commit 1feda18

Please sign in to comment.