Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add redis sentinel support #107

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions omniqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ serde = { version = "1.0.196", features = ["derive"] }
tokio = { version = "1", features = ["macros"] }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
rstest = "0.23.0"

[features]
default = ["in_memory", "gcp_pubsub", "rabbitmq", "redis", "redis_cluster", "sqs"]
Expand All @@ -49,6 +50,7 @@ rabbitmq = ["dep:futures-util", "dep:lapin"]
rabbitmq-with-message-ids = ["rabbitmq", "dep:svix-ksuid"]
redis = ["dep:bb8", "dep:bb8-redis", "dep:redis", "dep:svix-ksuid"]
redis_cluster = ["dep:async-trait", "redis", "redis/cluster-async"]
redis_sentinel = ["dep:async-trait", "redis", "redis/sentinel"]
sqs = ["dep:aws-config", "dep:aws-sdk-sqs"]
azure_queue_storage = ["dep:azure_storage", "dep:azure_storage_queues"]
beta = []
75 changes: 66 additions & 9 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ use std::{

use bb8::ManageConnection;
pub use bb8_redis::RedisConnectionManager;
#[cfg(feature = "redis_sentinel")]
use redis::{sentinel::SentinelNodeConnectionInfo, ProtocolVersion, RedisConnectionInfo, TlsMode};
use redis::{AsyncCommands, ExistenceCheck, SetExpiry, SetOptions};
use serde::Serialize;
use svix_ksuid::KsuidLike;
Expand All @@ -58,30 +60,65 @@ use crate::{
#[cfg(feature = "redis_cluster")]
mod cluster;
mod fallback;
#[cfg(feature = "redis_sentinel")]
mod sentinel;
mod streams;

#[cfg(feature = "redis_cluster")]
pub use cluster::RedisClusterConnectionManager;
#[cfg(feature = "redis_sentinel")]
pub use sentinel::RedisSentinelConnectionManager;

pub trait RedisConnection:
ManageConnection<
Connection: redis::aio::ConnectionLike + Send + Sync,
Error: std::error::Error + Send + Sync + 'static,
>
{
fn from_dsn(dsn: &str) -> Result<Self>;
fn from_config(config: &RedisConfig) -> Result<Self>;
}

impl RedisConnection for RedisConnectionManager {
fn from_dsn(dsn: &str) -> Result<Self> {
Self::new(dsn).map_err(QueueError::generic)
fn from_config(config: &RedisConfig) -> Result<Self> {
Self::new(config.dsn.as_str()).map_err(QueueError::generic)
}
}

#[cfg(feature = "redis_cluster")]
impl RedisConnection for RedisClusterConnectionManager {
fn from_dsn(dsn: &str) -> Result<Self> {
Self::new(dsn).map_err(QueueError::generic)
fn from_config(config: &RedisConfig) -> Result<Self> {
Self::new(config.dsn.as_str()).map_err(QueueError::generic)
}
}

#[cfg(feature = "redis_sentinel")]
impl RedisConnection for RedisSentinelConnectionManager {
fn from_config(config: &RedisConfig) -> Result<Self> {
let cfg = config
.sentinel_config
.clone()
.ok_or(QueueError::Unsupported("Missing sentinel configuration"))?;

let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure);
let protocol = if cfg.redis_use_resp3 {
ProtocolVersion::RESP3
} else {
ProtocolVersion::default()
};
RedisSentinelConnectionManager::new(
vec![config.dsn.as_str()],
cfg.service_name.clone(),
Some(SentinelNodeConnectionInfo {
tls_mode,
redis_connection_info: Some(RedisConnectionInfo {
db: cfg.redis_db.unwrap_or(0),
username: cfg.redis_username.clone(),
password: cfg.redis_password.clone(),
protocol,
}),
}),
)
.map_err(QueueError::generic)
}
}

Expand Down Expand Up @@ -233,6 +270,17 @@ pub struct RedisConfig {
pub payload_key: String,
pub ack_deadline_ms: i64,
pub dlq_config: Option<DeadLetterQueueConfig>,
pub sentinel_config: Option<SentinelConfig>,
}

#[derive(Clone)]
pub struct SentinelConfig {
pub service_name: String,
pub redis_tls_mode_secure: bool,
pub redis_db: Option<i64>,
pub redis_username: Option<String>,
pub redis_password: Option<String>,
pub redis_use_resp3: bool,
}

#[derive(Clone)]
Expand Down Expand Up @@ -269,6 +317,12 @@ impl RedisBackend {
pub fn cluster_builder(config: RedisConfig) -> RedisClusterBackendBuilder {
RedisBackendBuilder::new(config)
}

#[cfg(feature = "redis_sentinel")]
/// Creates a new redis sentinel queue builder with the given configuration.
pub fn sentinel_builder(config: RedisConfig) -> RedisSentinelBackendBuilder {
RedisBackendBuilder::new(config)
}
}

#[allow(deprecated)]
Expand Down Expand Up @@ -305,8 +359,11 @@ pub struct RedisBackendBuilder<R = RedisConnectionManager, S = Static> {
#[cfg(feature = "redis_cluster")]
pub type RedisClusterBackendBuilder = RedisBackendBuilder<RedisClusterConnectionManager>;

#[cfg(feature = "redis_sentinel")]
pub type RedisSentinelBackendBuilder = RedisBackendBuilder<RedisSentinelConnectionManager>;

impl<R: RedisConnection> RedisBackendBuilder<R> {
fn new(config: RedisConfig) -> Self {
pub fn new(config: RedisConfig) -> Self {
Self {
config,
use_redis_streams: true,
Expand Down Expand Up @@ -372,7 +429,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_pair(self) -> Result<(RedisProducer<R>, RedisConsumer<R>)> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand Down Expand Up @@ -407,7 +464,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_producer(self) -> Result<RedisProducer<R>> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand All @@ -427,7 +484,7 @@ impl<R: RedisConnection> RedisBackendBuilder<R> {
}

pub async fn build_consumer(self) -> Result<RedisConsumer<R>> {
let redis = R::from_dsn(&self.config.dsn)?;
let redis = R::from_config(&self.config)?;
let redis = bb8::Pool::builder()
.max_size(self.config.max_connections.into())
.build(redis)
Expand Down
56 changes: 56 additions & 0 deletions omniqueue/src/backends/redis/sentinel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use async_trait::async_trait;
use redis::{
sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType},
ErrorKind, IntoConnectionInfo, RedisError,
};
use tokio::sync::Mutex;

// The mutex here is needed b/c there's currently
// no way to get connections in the redis sentinel client
// without a mutable reference to the underlying client.
struct LockedSentinelClient(pub(crate) Mutex<SentinelClient>);
jaymell marked this conversation as resolved.
Show resolved Hide resolved

/// ConnectionManager that implements `bb8::ManageConnection` and supports
/// asynchronous Sentinel connections via `redis::sentinel::SentinelClient`
pub struct RedisSentinelConnectionManager {
client: LockedSentinelClient,
}

impl RedisSentinelConnectionManager {
pub fn new<T: IntoConnectionInfo>(
info: Vec<T>,
service_name: String,
node_connection_info: Option<SentinelNodeConnectionInfo>,
) -> Result<RedisSentinelConnectionManager, RedisError> {
Ok(RedisSentinelConnectionManager {
client: LockedSentinelClient(Mutex::new(SentinelClient::build(
info,
service_name,
node_connection_info,
SentinelServerType::Master,
)?)),
})
}
}

#[async_trait]
impl bb8::ManageConnection for RedisSentinelConnectionManager {
type Connection = redis::aio::MultiplexedConnection;
type Error = RedisError;

async fn connect(&self) -> Result<Self::Connection, Self::Error> {
self.client.0.lock().await.get_async_connection().await
}

async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> {
let pong: String = redis::cmd("PING").query_async(conn).await?;
match pong.as_str() {
"PONG" => Ok(()),
_ => Err((ErrorKind::ResponseError, "ping request").into()),
}
}

fn has_broken(&self, _: &mut Self::Connection) -> bool {
false
}
}
2 changes: 1 addition & 1 deletion omniqueue/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod azure_queue_storage;
mod gcp_pubsub;
#[cfg(feature = "rabbitmq")]
mod rabbitmq;
#[cfg(feature = "redis")]
#[cfg(any(feature = "redis", feature = "redis_sentinel"))]
mod redis;
#[cfg(feature = "redis_cluster")]
mod redis_cluster;
Expand Down
Loading