Skip to content

Commit

Permalink
Redis: Refactor delayed-message enqueuing (#103)
Browse files Browse the repository at this point in the history
Make the code here a bit more generic and less ambiguous by requiring
InternalPayload instead of &[u8] to be passed to the functions
that move messages from delayed queue to the main queue. This will
enable reusing these functions later for things like redriving DLQs.
  • Loading branch information
jaymell authored Sep 6, 2024
1 parent 936837c commit b49857b
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
16 changes: 8 additions & 8 deletions omniqueue/src/backends/redis/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,18 @@ impl<R: RedisConnection> Acker for RedisFallbackAcker<R> {
}

pub(super) async fn add_to_main_queue(
keys: &[Vec<u8>],
keys: Vec<InternalPayload<'_>>,
main_queue_name: &str,
conn: &mut impl AsyncCommands,
) -> Result<()> {
// We don't care about existing `num_receives`
// since we're pushing onto a different queue.
let new_keys = keys
.iter()
.map(|x| regenerate_key(x))
.collect::<Result<Vec<_>>>()?;
.into_iter()
// So reset it to avoid carrying state over:
.map(|x| InternalPayload::new(x.payload))
.map(internal_to_list_payload)
.collect::<Vec<_>>();
let _: () = conn
.lpush(main_queue_name, new_keys)
.await
Expand Down Expand Up @@ -284,7 +288,3 @@ async fn reenqueue_timed_out_messages<R: RedisConnection>(

Ok(())
}

fn regenerate_key(key: &[u8]) -> Result<RawPayload> {
Ok(internal_to_list_payload(internal_from_list(key)?))
}
20 changes: 14 additions & 6 deletions omniqueue/src/backends/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -592,23 +592,31 @@ async fn background_task_delayed<R: RedisConnection>(
let timestamp = unix_timestamp(SystemTime::now() - Duration::from_secs(1))
.map_err(QueueError::generic)?;

let keys: Vec<RawPayload> = conn
let old_keys: Vec<RawPayload> = conn
.zrangebyscore_limit(delayed_queue_name, 0, timestamp, 0, BATCH_SIZE)
.await
.map_err(QueueError::generic)?;

if !keys.is_empty() {
trace!("Moving {} messages from delayed to main queue", keys.len());
if !old_keys.is_empty() {
let new_keys = old_keys
.iter()
.map(|x| internal_from_list(x))
.collect::<Result<Vec<_>>>()?;
trace!(
"Moving {} messages from delayed to main queue",
new_keys.len()
);

if use_redis_streams {
streams::add_to_main_queue(&keys, main_queue_name, payload_key, &mut *conn).await?;
streams::add_to_main_queue(new_keys, main_queue_name, payload_key, &mut *conn)
.await?;
} else {
fallback::add_to_main_queue(&keys, main_queue_name, &mut *conn).await?;
fallback::add_to_main_queue(new_keys, main_queue_name, &mut *conn).await?;
}

// Then remove the tasks from the delayed queue so they aren't resent
let _: () = conn
.zrem(delayed_queue_name, keys)
.zrem(delayed_queue_name, old_keys)
.await
.map_err(QueueError::generic)?;

Expand Down
17 changes: 9 additions & 8 deletions omniqueue/src/backends/redis/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use redis::{
use tracing::{error, trace};

use super::{
internal_from_list, DeadLetterQueueConfig, InternalPayload, InternalPayloadOwned,
RedisConnection, RedisConsumer, RedisProducer,
DeadLetterQueueConfig, InternalPayload, InternalPayloadOwned, RedisConnection, RedisConsumer,
RedisProducer,
};
use crate::{queue::Acker, Delivery, QueueError, Result};

Expand Down Expand Up @@ -241,20 +241,21 @@ impl<R: RedisConnection> Acker for RedisStreamsAcker<R> {
}

pub(super) async fn add_to_main_queue(
keys: &[Vec<u8>],
keys: Vec<InternalPayload<'_>>,
main_queue_name: &str,
payload_key: &str,
conn: &mut impl redis::aio::ConnectionLike,
) -> Result<()> {
let mut pipe = redis::pipe();
for key in keys {
// We don't care about `num_receives` here since we're
// re-queuing from delayed queue:
let InternalPayload { payload, .. } = internal_from_list(key)?;
// We don't care about existing `num_receives`
// since we're pushing onto a different queue.
for InternalPayload { payload, .. } in keys {
// So reset it to avoid carrying state over:
let internal = InternalPayload::new(payload);
let _ = pipe.xadd(
main_queue_name,
GENERATE_STREAM_ID,
internal_to_stream_payload!(InternalPayload::new(payload), payload_key),
internal_to_stream_payload!(internal, payload_key),
);
}

Expand Down

0 comments on commit b49857b

Please sign in to comment.