【Distributed System】消息队列 - 延时队列(Delay Queues)

Posted by 西维蜀黍 on 2024-09-19, Last Modified on 2024-09-25

延时队列(Delay Queues)

Delay queues let you postpone the delivery of new messages to consumers for a number of seconds, for example, when your consumer application needs additional time to process messages. If you create a delay queue, any messages that you send to the queue remain invisible to consumers for the duration of the delay period.

应用场景

延时消息适用于以下场景:

  • 比如超时关单,即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。
  • 比如回调重试:对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min 5 min 10 min 30 min 1hour 等间隔进行回调。
  • 各种延时提醒:比如用户下单未支付时,系统在关单前 10 分钟提醒用户去支付。比如在二手车系统中,提醒买手尽快出价。

实现

DelayQueue 延时队列

JDK 中提供了一组实现延迟队列的 API,位于 Java.util.concurrent 包下 DelayQueue

DelayQueue 是一个 BlockingQueue(无界阻塞)队列,它本质就是封装了一个 PriorityQueue(优先队列),PriorityQueue 内部使用完全二叉堆来实现队列元素排序,我们在向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。

Redis sorted set

Redis 的数据结构 Zset,同样可以实现延迟队列的效果,主要利用它的 score 属性,redis 通过 score 来为集合中的成员进行从小到大的排序。

通过 zadd 命令向队列 delayqueue 中添加元素,并设置 score 值表示元素过期的时间;向 delayqueue 添加三个 order1order2order3,分别是 10秒20秒30秒后过期。

 zadd delayqueue 3 order3

消费端轮询队列 delayqueue, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除 key

消费端轮询队列 delayqueue, 将元素排序后取最小时间与当前时间比对,如小于当前时间代表已经过期移除 key

    /**
     * 消费消息
     */
    public void pollOrderQueue() {

        while (true) {
            Set<Tuple> set = jedis.zrangeWithScores(DELAY_QUEUE, 0, 0);

            String value = ((Tuple) set.toArray()[0]).getElement();
            int score = (int) ((Tuple) set.toArray()[0]).getScore();
            
            Calendar cal = Calendar.getInstance();
            int nowSecond = (int) (cal.getTimeInMillis() / 1000);
            if (nowSecond >= score) {
                jedis.zrem(DELAY_QUEUE, value);
                System.out.println(sdf.format(new Date()) + " removed key:" + value);
            }

            if (jedis.zcard(DELAY_QUEUE) <= 0) {
                System.out.println(sdf.format(new Date()) + " zset empty ");
                return;
            }
            Thread.sleep(1000);
        }
    }

Redis 过期回调

Rediskey 过期回调事件,也能达到延迟队列的效果,简单来说我们开启监听 key 是否过期的事件,一旦 key 过期会触发一个 callback 事件。

修改 redis.conf 文件开启 notify-keyspace-events Ex

notify-keyspace-events Ex
Redis`监听配置注入Bean `RedisMessageListenerContainer
@Configuration
public class RedisListenerConfig {
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}

编写 Redis 过期回调监听方法,必须继承 KeyExpirationEventMessageListener ,有点类似于 MQ 的消息监听。

RabbitMQ 延时队列

利用 RabbitMQ 做延时队列是比较常见的一种方式,而实际上 RabbitMQ 自身并没有直接支持提供延迟队列功能,而是通过 RabbitMQ 消息队列的 TTLDXL 这两个属性间接实现的。

先来认识一下 TTLDXL 两个概念:

Time To Live(TTL) :

TTL 顾名思义:指的是消息的存活时间,RabbitMQ 可以通过 x-message-tt 参数来设置指定 Queue(队列)和 Message(消息)上消息的存活时间,它的值是一个非负整数,单位为微秒。

RabbitMQ` 可以从两种维度设置消息过期时间,分别是`队列`和`消息本身
  • 设置队列过期时间,那么队列中所有消息都具有相同的过期时间。
  • 设置消息过期时间,对队列中的某一条消息设置过期时间,每条消息 TTL 都可以不同。

如果同时设置队列和队列中消息的 TTL,则 TTL 值以两者中较小的值为准。而队列中的消息存在队列中的时间,一旦超过 TTL 过期时间则成为 Dead Letter(死信)。

Dead Letter ExchangesDLX

DLX 即死信交换机,绑定在死信交换机上的即死信队列。RabbitMQQueue(队列)可以配置两个参数 x-dead-letter-exchangex-dead-letter-routing-key(可选),一旦队列内出现了 Dead Letter(死信),则按照这两个参数可以将消息重新路由到另一个 Exchange(交换机),让消息重新被消费。

x-dead-letter-exchange:队列中出现 Dead Letter 后将 Dead Letter 重新路由转发到指定 exchange(交换机)。

x-dead-letter-routing-key:指定 routing-key 发送,一般为要指定转发的队列。

队列出现 Dead Letter 的情况有:

  • 消息或者队列的 TTL 过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)

下边结合一张图看看如何实现超 30 分钟未支付关单功能,我们将订单消息 A0001 发送到延迟队列 order.delay.queue,并设置 x-message-tt 消息存活时间为 30 分钟,当到达 30 分钟后订单消息 A0001 成为了 Dead Letter(死信),延迟队列检测到有死信,通过配置 x-dead-letter-exchange,将死信重新转发到能正常消费的关单队列,直接监听关单队列处理关单逻辑即可。

时间轮

前边几种延时队列的实现方法相对简单,比较容易理解,时间轮算法就稍微有点抽象了。kafkanetty 都有基于时间轮算法实现延时队列,下边主要实践 Netty 的延时队列讲一下时间轮是什么原理。

先来看一张时间轮的原理图,解读一下时间轮的几个基本概念

wheel :时间轮,图中的圆盘可以看作是钟表的刻度。比如一圈 round 长度为 24秒,刻度数为 8,那么每一个刻度表示 3秒。那么时间精度就是 3秒。时间长度 / 刻度数值越大,精度越大。

当添加一个定时、延时任务A,假如会延迟 25秒后才会执行,可时间轮一圈 round 的长度才 24秒,那么此时会根据时间轮长度和刻度得到一个圈数 round 和对应的指针位置 index,也是就任务A 会绕一圈指向 0格子上,此时时间轮会记录该任务的 roundindex 信息。当 round=0,index=0 ,指针指向 0格子 任务A 并不会执行,因为 round=0 不满足要求。

所以每一个格子代表的是一些时间,比如 1秒25秒 都会指向 0 格子上,而任务则放在每个格子对应的链表中,这点和 HashMap 的数据有些类似。

Netty 构建延时队列主要用 HashedWheelTimerHashedWheelTimer 底层数据结构依然是使用 DelayedQueue,只是采用时间轮的算法来实现。

Reference