延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才需要这样的队列呢?
很多时候我们会有延时处理一个任务的需求,比如说:
2个小时后给用户发送短信。
15分钟后关闭网络连接。
2分钟后再次尝试回调。
下面我们来分别探讨一下几种实现方案:
Java中的DelayQueue
Java中的DelayQueue位于java.util.concurrent包下,本质是由PriorityQueue和BlockingQueue实现的阻塞优先级队列。
放入队列的元素需要实现Delayed接口:
public interface Delayed extends Comparable<Delayed> {
* Returns the remaining delay associated with this object, in the
* given time unit.
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
long getDelay(TimeUnit unit);
}
通过实现这个接口,来完成对队列中元素,按照时间延迟先后排序的目的。
从队列中取元素:
* Retrieves and removes the head of this queue, waiting if necessary
* until an element with an expired delay is available on this queue.
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}可以看到,在这段代码里,在第一个元素的延迟时间还没到的情况下:
-
如果当前没有其他线程等待,则阻塞当前线程直到延迟时间。
-
如果有其他线程在等待,则阻塞当前线程。
向队列中放入元素:
* Inserts the specified element into this delay queue.
* @param e the element to add
* @return <tt>true</tt>
* @throws NullPointerException if the specified element is null
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
if (q.peek() == e) {
leader = null;
available.signal();
return true;
} finally {
lock.unlock();
}在放入元素的时候,会唤醒等待中的读线程。
如果我们不考虑分布式运行和任务持久化的话,Java中的DelayQueue是一个很理想的方案,精巧好用。但是如果我们需要分布式运行和任务持久化,就需要引入一些外部组件。
使用Redis实现
前文我们看到,可以通过优先级队列来实现延迟队列的功能。Redis提供了很多数据结构,其中的zset是一种有序的数据结构;我们可以通过Redis中的zset来实现一个延迟队列。
基本的方法就是使用时间戳作为元素的score存入zset。
redis> ZADD delayqueue <future_timestamp> "messsage"
获取所有已经“就绪”的message,并且删除message。
redis> MULTI
redis> ZRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> ZREMRANGEBYSCORE delayqueue 0 <current_timestamp>
redis> EXEC
但是这个方案也有一些问题:
Redis事务虽然保证了一致性和隔离性,但是并没有提供回滚功能。消息处理失败是不能被恢复的,如果处理某条消息的线程崩溃或机器宕机,这条未被处理不能被自动的再次处理。
也有考虑过将分为TODO和Doing两条队列:
先从TODO队列中取出任务,放入Doing中,再开始处理;如果停留在Doing队列总过久,则重新放入TODO队列。但是由于Redis的事务特性,并不能做到完全可靠;并且检查Doing超时的逻辑也略复杂。那么有没有一个成熟的消息队列可以支持延迟投递消息的功能呢?答案当然是有的,本文的标题就是使用RabbitMQ实现DelayQueue。
使用RabbitMQ实现
这是RabbitMQ众多隐藏的强大特性中的一个,可以轻松的降低代码的复杂度,实现DelayQueue的功能。
我们需要两个队列,一个用来做主队列,真正的投递消息;另一个用来延迟处理消息。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
connection = factory.newConnection();
channel = connection.createChannel();
channel.queueDeclare("MAIN_QUEUE", true, false, false, null);
channel.queueBind("MAIN_QUEUE", "amq.direct", "MAIN_QUEUE");
HashMap<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "amq.direct");
arguments.put("x-dead-letter-routing-key", "MAIN_QUEUE");
channel.queueDeclare("DELAY_QUEUE", true, false, false, arguments);
放入延迟消息:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
AMQP.BasicProperties properties = builder.expiration(String.valueOf(task.getDelayMillis())).deliveryMode(2).build();
channel.basicPublish("", "DELAY_QUEUE", properties, SerializationUtils.serialize(task));
而关键点,就在于 x-dead-letter-exchange 和 x-dead-letter-routing-key 两个参数上。这两个参数说明了:消息过期后的处理方式 --> 投递到我们指定的MAIN_QUEUE;然后我们只需要在MAIN_QUEUE中等待消息投递即可。
RabbitMQ本身提供了消息持久化和没有收到ACK的重投递功能,这样我们就可以实现一个高可靠的分布式延迟消息队列了。
PS:上面讲述的RabbitMQ定时任务方案有问题,RabbitMQ TTL文档 中写道:
Caveats
While consumers never see expired messages, only when expired messages reach the head of a queue will they actually be discarded (or dead-lettered). When setting a per-queue TTL this is not a problem, since expired messages are always at the head of the queue. When setting per-message TTL however, expired messages can queue up behind non-expired ones until the latter are consumed or expired. Hence resources used by such expired messages will not be freed, and they will be counted in queue statistics (e.g. the number of messages in the queue).
per-queue TTL不会有问题,因为快要过期的消息总是在队列的前边;但是如果使用per-message TTL的话,过期的消息有可能会在未过期的消息后边,直到前边的消息过期或者被消费。因为RabbitMQ保证过期的消息一定不会被消费者消费,但是不能保证消息过期就会从队列中移除。
ActiveMQ
ActiveMQ from version 5.4 has an optional persistent scheduler built into the ActiveMQ message broker.
可以支持定时、延迟投递、重复投递和Cron调度。
在配置文件中,启用<broker ... schedulerSupport="true"> 选项后即可使用。
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);
ActiveMQ配置项介绍:
Property type Description
AMQ_SCHEDULED_DELAY false The time in milliseconds that a message will wait before
being scheduled to be delivered by the broker
AMQ_SCHEDULED_DELAY false 消息延迟发送的延迟时间(单位毫秒)
AMQ_SCHEDULED_PERIOD false The time in milliseconds after the start time to wait before
scheduling the message again
AMQ_SCHEDULED_PERIOD false 代理启动后,发送消息之前的等待时间(单位毫秒).
AMQ_SCHEDULED_REPEAT false The number of times to repeat scheduling a message for delivery
AMQ_SCHEDULED_REPEAT false 调度消息发送的重复次数
AMQ_SCHEDULED_CRON String Use a cron entry to set the schedule
AMQ_SCHEDULED_CRON String 使用一个cron实体设置消息发送调度
文章引自:http://zhangyp.net/rabbitmq-delayqueue/
缺点:复杂度比quartz要高,自己要处理
分布式
横向扩展的问题,因为数据是放在内存里,需要自己写持久化的备案以达到高可用。
3.利用wheelTimer:netty的HashedWheelTimer
优点:效率高,根据楼主自己写的测试,在大量高负荷的任务堆积的情况下,Hash...
当前项目中遇到这样一个需求: 将需要审核的文本提交给人工智能模型接口审核,等待模型接口审核完毕以后拿到审核结果返回给前端展示给用户(另:模型处理数据所消耗的时间会随着用户提交数据的复杂度有所变化)。,如果有多个用户在同一时间提交了文本审核需求,且恰好此时此刻模型需要较长时间处理,这样的话,按照通常的代码写法,是无法满足任务需求的,那么应该如何满足这个需求呢?此时此刻,我们可以使用。上述就是任务
队列
的Java
实现
基本过程。
分布式
延时任务方案解析一级目录方案
分析
(1)数据库轮询(2)JDK的
延迟
队列
(3)时间轮算法(4)redis缓存(5)使用
消息队列
补充
在开发中,往往会遇到一些关于延时任务的需求。例如
生成订单30分钟未支付,则自动取消
生成订单60秒后,给用户发短信
对上述的任务,我们给一个专业的名字来形容,那就是延时任务。那么这里就会产生一个问题,这个延时任务和定时任务的区别究竟在哪里呢?一共有如下几点区别
定时任务有明确的触发时间,延时任务没有
定时任务有执行周期,而延时任务在某事件触发后一
典型应用场景
在大数据数据收集环节,需要构建数据流水线,其中一种方式可以通过
消息队列
实现
。在实际应用中,不同服务器产生的日志,比如指标监控数据,用户搜索日志,用户点击日志等,需要同时传送到多个系统中以便进行相应的逻辑处理和挖掘。
消息队列
位于...
当消费者从
队列
中接收并处理一个消息时,该消息会留在
队列
中。Amazon SQS不会自动删除该消息。因为Amazon SQS是一个
分布式
系统,不能保证消费者真正收到消息(例如,由于连接问题,或由于消费者应用程序中的问题)。因此,消费者在接收和处理消息后必须从
队列
中删除该消息。
在收到一个消息后,它立即留在
队列
中。为了防止其他消费者再次处理该消息,Amazon SQS设置了一个可见性超时,即Amazon SQS防止其他消费者接收和处理该消息的时间段。一个消息的默认可见性超时是30秒。最小是0秒。最大的是12小
1.在https://console.aws.amazon.com/sqs/,打开Amazon SQS控制台。
2.在导航窗格中,选择
队列
。
3.在
队列
页面上,选择一个
队列
。
4.在行动中,选择发送和接收消息。
控制台显示发送和接收信息页面。
5.在消息正文中,输入消息文本。
6.对于先进先出(FIFO)
队列
,输入一个消息组ID。更多信
在RedissonDelayedQueue的构造方法会触发任务的调度,在这个任务里面会动态的触发定时任务的执行,这些定时任务会在任务过期时调用pushTaskAsync()方法,执行上面的Redis命令,将过期数据放入目标
延迟
队列
供消费者消费。Redisson往这个List里面放的时候使用的rpush命令,rpush命令的意思是往List的右边放。take()这个方法的核心原理非常简单,核心代码就是使用Redis的BLPOP命令,从Redis的List数据结构里面取数据,取不到就阻塞,一直等到有数据进来。
延时消息(定时消息)指的在
分布式
异步消息场景下,生产端发送一条消息,希望在指定延时或者指定时间点被消费端消费到,而不是立刻被消费。
延时消息适用的业务场景非常的广泛,在
分布式
系统环境下,延时消息的功能一般会在下沉到中间件层,通常是 MQ 中内置这个功能或者内聚成一个公共基础服务。
本文旨在探讨常见延时消息的
实现
方案以及方案
设计
的优缺点。
实现
方案
1. 基于外部存储
实现
的方案
这里讨论的外部存储指的是在 MQ 本身自带的存储以外又引入的其他的存储系统。
基于外部存储的方案本质上都是一..