延迟队列的实现思路
定义
延迟队列指的是元素按照延迟时间进行排序形成并且到时后能自动弹出的有序队列,底层数据结构既可以是数组也可以是链表
适用场景
按照倒计时触发的业务场景,例如电商网站中的订单未支付自动取消,竞拍,日历待办提醒的场景,以及用定时任务扫表触发的业务场景
实现原理
如果让你来设计延时队列,你需要怎么样来设计?
先来个基础1.0的设计:要实现延时队列需要两个角色,第一个是存储信息的队列,第二个角色是’计时器’负责监视队列中的消息时候到期
图示2
实现方式
DelayQueue
public static void main(String[] args) {
//1.队列的角色
DelayQueue<DelayedTask> queue = new DelayQueue<>();
DelayedTask task1 = new DelayedTask(1_0L, TimeUnit.SECONDS, () -> System.out.println(Thread.currentThread().getName() + "_1:" + LocalDateTime.now()));
DelayedTask task5 = new DelayedTask(1_5L, TimeUnit.SECONDS, () -> System.out.println(Thread.currentThread().getName() + "_2:" + LocalDateTime.now()));
queue.add(task1);
queue.add(task5);
System.out.println(LocalDateTime.now());
//2.计时器的角色
while (!queue.isEmpty()){
DelayedTask task = queue.poll();
task.getRunnable().run();
}
}
- 源码解析
DelayQueue底层采用的是PriorityQueue,一种提供优先级的队列,poll()方法如下:
public E poll() {
//PriorityQueue是线程不安全的,因此需要用lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();
} finally {
lock.unlock();
}
}
这样的延时队列可以解决单个进程下的延迟队列场景,但是无法解决多个应用下的场景,那么如何在实现分布式的延迟队列喃?
先从问题分析,我们想要实现的是分布式的延迟队列,并且知道延迟队列是由定时器和队列构成,定时器由客户端实现,那么问题就变成了我们要实现一个分布式的具有优先级的队列结构,聪明的你一定就想到了这不是redis中的zset嘛?对,下面我们来看一下基于redis实现延迟队列的1.0版本
基于Redis的延迟队列
zset定义:排序集合,类似于集合,但每个字符串元素都与一个称为得分的浮点值相关联。 元素总是按它们的分数排序,因此与Sets不同,可以检索一系列元素
- zset
public static void main(String[] args) throws InterruptedException {
RedissonClient redisClient = getRedisClient();
RScoredSortedSet<String> zset = redisClient.getScoredSortedSet("redisDelayQueue");
zset.add(getTimeLong(10L), "a");
zset.add(getTimeLong(20L), "c");
zset.add(getTimeLong(15L), "b");
while (true) {
Collection<String> values = zset.valueRange(getTimeLong(-1L), true, getTimeLong(0L), false);
values.forEach(System.out::println);
TimeUnit.SECONDS.sleep(1L);
}
}
public static RedissonClient getRedisClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379");
RedissonClient client = Redisson.create(config);
return client;
}
public static Long getTimeLong(Long seconds) {
return LocalDateTime.now().plusSeconds(seconds).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}
- RedissonDelayedTask
public static void main(String[] args) throws InterruptedException {
RedissonClient redisClient = getRedisClient();
RBlockingQueue<String> blockingFairQueue = redisClient.getBlockingQueue("RedissonDelayed");
RDelayedQueue<String> delayedQueue = redisClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer("a",10, TimeUnit.SECONDS);
delayedQueue.offer("c",20, TimeUnit.SECONDS);
delayedQueue.offer("b",15, TimeUnit.SECONDS);
while (true){
System.out.println(blockingFairQueue.take());
}
}
Redisson也实现了延迟队列(RedissonDelayedQueue),底层数据结构使用的是zset,list,发布/订阅,并且也不是想我们现在这样通过while和方式来监听变化的,感兴趣的童鞋可以看一下Redisson实现时使用的Lua脚本
目前Redisson基于reids的延迟队列在功能上很完善了作为延迟队列来说已经是满足的了,但是作为一个延时消息队列来说还缺少重试机制,ACK,因此下面介绍两种基于MQ实现延迟队列的方式
基于RabbitMQ的延迟队列
基于TTL的实现方式
基于生存时间(TTL)和死信队列(DLX)特性实现的延迟队列;TTL指的是每条消息都有一个生存时间,超过过期时间后消息就会进入一个特殊队列,这个队列就是死信队列(DLX),DLX可以将消息重新投递到指定的队列中,consumer只需要订阅这个队列就可以实现延迟消费的功能
- Provider
- consumer