96SEO 2026-02-20 08:41 14
延时队列在项目中的应用还是比较多的#xff0c;尤其像…一、延时队列的应用

顾名思义首先它要具有队列的特性再给它附加一个延迟消费队列消息的功能也就是说可以指定队列中的消息在哪个时间点被消费。
下订单成功后在30分钟内没有支付自动取消订单外卖平台发送订餐通知下单成功后60s给用户推送短信。
如果订单一直处于某一个未完结状态时及时处理关单并退还库存淘宝新建商户一个月内还没上传商品信息将冻结商铺等用户登录之后5分钟给用户做分类推送用户多少天未登录给用户做召回推送关闭空闲连接。
服务器中有很多客户端的连接空闲一段时间之后需要关闭之。
清理过期数据业务上。
比如缓存中的对象超过了空闲时间需要从缓存中移出。
任务超时处理。
在网络协议滑动窗口请求应答式交互时处理超时未响应的请求。
新创建店铺N天内没有上传商品系统如何知道该信息并发送激活短信
定时任务有明确的触发时间延时任务没有定时任务有执行周期而延时任务在某事件触发后一段时间内执行没有执行周期定时任务一般执行的是批处理操作是多个任务而延时任务一般是单个任务
业界目前也有很多实现方案单机版的方案就不说了现在也没有哪个公司还是单机版的服务今天我们一一探讨各种方案的大致实现。
个人一直秉承的观点工作上能用JDK自带API实现的功能就不要轻易自己重复造轮子或者引入三方中间件。
一方面自己封装很容易出问题大佬除外再加上调试验证产生许多不必要的工作量另一方面一旦接入三方的中间件就会让系统复杂度成倍的增加维护成本也大大的增加。
通过一个线程定时的去扫描数据库通过下订单时间来判断是否有超时的订单然后进行update或delete等操作
对于未支付的订单需要进行定期扫描判断其产生时间有没有超过固定的时间如果超过则取消订单。
这样做很不好因为
扫描数据库每次扫描整个表非常耗时因为一直在阻塞很耗费资源所谓的定期扫描扫描时间的粒度太小就会造成资源浪费扫描的粒度太大就会造成订单没有及时删除
常见的定时任务主要有quartz、xxl-job、spring系列的task、linux自带的
检测如果未支付则自动取消订单生成订单60s后给客户发短信Date
process(){System.out.println(我是定时任务\n
{System.out.println(取消订单逻辑处理);/***
此处省略1000万行代码*/System.out.println(发送短信提醒客户订单已取消);}}/***
DateTools.string2DateTime(2020-07-22
检测如果未支付则自动取消订单生成订单60s后给客户发短信*/executeTime
DateUtils.addMinutes(submitOrder,30);/***
检测如果未支付则自动取消订单生成订单60s后给客户发短信Date
JobBuilder.newJob(QuartzDelayQueueDemo.class).withIdentity(job1,
TriggerBuilder.newTrigger().withIdentity(trigger1,
group3).withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3).repeatForever()).build();Scheduler
StdSchedulerFactory().getScheduler();//
将任务及其触发器放入调度器scheduler.scheduleJob(jobDetail,
调度器开始调度任务scheduler.start();}Overridepublic
{System.out.println(取消订单逻辑处理);/***
此处省略1000万行代码*/System.out.println(发送短信提醒客户订单已取消);}}
对服务器内存消耗大存在延迟比如你每隔3分钟扫描一次那最坏的延迟时间就是3分钟假设你的订单有几千万条每隔几分钟这样扫描一次数据库损耗极大数据量过大时会消耗太多的IO资源效率太低
中提供了一组实现延迟队列的API位于Java.util.concurrent包下DelayQueue。
DelayQueue是一个BlockingQueue无界阻塞队列它本质就是封装了一个PriorityQueue优先队列PriorityQueue内部使用完全二叉堆不知道的自行了解哈来实现队列元素排序我们在向DelayQueue队列中添加元素时会给元素一个Delay延迟时间作为排序条件队列中最小的元素会优先放在队首。
队列中的元素只有到了Delay时间才允许从队列中取出。
队列中可以放基本数据类型或自定义实体类在存放基本数据类型时优先队列中元素默认升序排列自定义实体类就需要我们根据类属性值比较计算了。
先简单实现一下看看效果添加三个order入队DelayQueue分别设置订单在当前时间的5秒、10秒、15秒后取消。
要实现DelayQueue延时队列队中元素要implements
接口这个接口里只有一个getDelay方法用于设置延期时间。
Order类中compareTo方法负责对队列中的元素进行排序。
poll()获取并移除队列的超时元素没有则返回空take()获取并移除队列的超时元素如果没有则wait当前线程直到有元素满足超时条件返回结果。
BlockingQueuePriorityQueue堆排序DelayedDelayQueue中存放的对象需要实现compareTo()方法和getDelay()方法getDelay方法返回该元素距离失效还剩余的时间当0时元素就失效了就可以从队列中获取到
System.currentTimeMillis();}/***
https://www.runoob.com/java/java-string-compareto.html*
{System.out.println(orderNumber
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));}}DelayQueue的put方法是线程安全的因为put方法内部使用了ReentrantLock锁进行线程同步。
TimeUnit.SECONDS);DelayQueueDelayOrder
DelayQueue();delayOrders.put(orderNumber1);delayOrders.put(orderNumber2);delayOrders.put(orderNumber3);System.out.println(订单延迟队列开始时间:
LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd
take()获取并移除队列的超时元素如果没有则wait当前线程直到有元素满足超时条件返回结果。
*/
LocalDateTime.now().format(DateTimeFormatter.ofPattern(yyyy-MM-dd
}上边只是简单的实现入队与出队的操作实际开发中会有专门的线程负责消息的入队与消费。
5秒、10秒、15秒后被执行至此就用DelayQueue实现了延时队列。
服务器重启后数据全部消失怕宕机集群扩展相当麻烦因为内存条件限制的原因比如下单未付款的订单数太多那么很容易就出现OOM异常代码复杂度较高分布式需要额外实现无法指定绝对的日期或时间
可重入锁用于根据delay时间排序的优先级队列用于优化阻塞通知的线程元素leader用于实现阻塞和通知的Condition对象
在理解delayQueue原理之前我们需要先了解两个东西,delayed和PriorityQueue.
delayed是一个具有过期时间的元素PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列
delayQueue其实就是在每次往优先级队列中添加元素然后以元素的delay/过期值作为排序的因素以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素
执行加锁操作把元素添加到优先级队列中查看元素是否为队首如果是队首的话设置leader为空唤醒所有等待的队列释放锁
null;available.signal();}return
执行加锁操作取出优先级队列元素q的队首如果元素q的队首/队列为空,阻塞请求如果元素q的队首(first)不为空,获得这个元素的delay时间值如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露判断leader元素是否为空,不为空的话阻塞当前线程如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用循环执行从1~8的步骤如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程执行解锁操作
this.lock;lock.lockInterruptibly();try
null)available.signal();lock.unlock();}}get点
整个代码的过程中并没有使用上太难理解的地方,但是有几个比较难以理解他为什么这么做的地方
大家可能看到在我们的DelayQueue中有一个Thread类型的元素leader,那么他是做什么的呢,有什么用呢
上面主要的意思就是说用leader来减少不必要的等待时间,那么这里我们的DelayQueue是怎么利用leader来做到这一点的呢:
这里我们想象着我们有个多个消费者线程用take方法去取,内部先加锁,然后每个线程都去peek第一个节点.
null)available.await();如果为空说明没有其他线程去取这个节点,设置leader并等待delay延时到期,直到poll后结束循环
,设置了leader为当前线程A线程B进来获取first,进入else的阻塞操作,然后无限期等待这时在JDK
1.7下面他是持有first引用的如果线程A阻塞完毕,获取对象成功,出队,这个对象理应被GC回收,但是他还被线程B持有着,GC链可达,所以不能回收这个first.假设还有线程C
持有对象1引用,那么无限期的不能回收该对象1引用了,那么就会造成内存泄露.
java.util.Timer但官方推荐使用功能更为强大全面的
ScheduledThreadPoolExecutor因为后者支持多任务作业即线程池
Executors.newScheduledThreadPool(5);System.out.println(创建5秒延迟的任务,时间
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));ScheduledFuture?
scheduledExecutorService.schedule(()
TimeUnit.SECONDS);Thread.sleep(4900);schedule.cancel(false);System.err.println(取消5秒延迟的任务时间
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));System.out.println(创建3秒延迟的任务时间
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));ScheduledFuture?
scheduledExecutorService.schedule(new
TimeUnit.SECONDS);Thread.sleep(4000);}执行结果
TaskQueue中的排序是对TimerTask中的下一次执行时间进行堆排序每次去取数组第一个。
而delayQueue是对queue中的元素的getDelay()结果进行排序
Timer是一种定时器工具用来在一个后台线程计划执行指定任务。
它可以计划执行一个任务一次或反复多次。
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));this.cancel();}},
5sSystem.out.println(本程序存在5秒后自动退出,时间
DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis()));}
(只可以设置开始时间和重复间隔不是基于时间、日期、天等秒、分、时的)Timers
不能利用线程池一个timer一个线程Timers没有真正的管理计划
延时队列JDK,无法指定绝对的日期或时间无法高可用节点挂了任务不能跑
Redis的数据结构Zset同样可以实现延迟队列的效果主要利用它的score属性redis通过score来为集合中的成员进行从小到大的排序。
添加三个order1、order2、order3分别是10秒、20秒、30秒后过期。
将元素排序后取最小时间与当前时间比对如小于当前时间代表已经过期移除key。
RedisZsetDelayQueue();redisDelay.pushOrderQueue();redisDelay.pollOrderQueue();redisDelay.deleteZSet();}/***
JedisClient.JedisClient();Calendar
Calendar.getInstance();cal1.add(Calendar.SECOND,
Calendar.getInstance();cal2.add(Calendar.SECOND,
Calendar.getInstance();cal3.add(Calendar.SECOND,
order1*/jedis.zadd(DELAY_QUEUE,
order1);jedis.zadd(DELAY_QUEUE,
order2);jedis.zadd(DELAY_QUEUE,
order3);System.out.println(DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis())
JedisClient.JedisClient();jedis.del(DELAY_QUEUE);}/***
JedisClient.JedisClient();while
jedis.zrangeWithScores(DELAY_QUEUE,
set.toArray()[0]).getElement();int
set.toArray()[0]).getScore();Calendar
移除有序集合中的一个或多个成员jedis.zrem(DELAY_QUEUE,
value);System.out.println(DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis())
{System.out.println(DateTools.getLong2YyyyMmDdHhMmSs(System.currentTimeMillis())
的key过期回调事件也能达到延迟队列的效果简单来说我们开启监听key是否过期的事件一旦key过期会触发一个callback事件。
修改redis.conf文件开启notify-keyspace-events
在线文档https://redis.io/topics/notifications
Ex#############################
###############################
http://redis.io/topics/notifications
{BeanRedisMessageListenerContainer
container(RedisConnectionFactory
RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);return
}编写Redis过期回调监听方法必须继承KeyExpirationEventMessageListener
KeyExpirationEventMessageListener
RedisKeyExpirationListener(RedisMessageListenerContainer
{super(listenerContainer);}Overridepublic
message.toString();System.out.println(监听到key
}到这代码就编写完成非常的简单接下来测试一下效果在redis-cli客户端添加一个key
由于使用Redis作为消息通道消息都存储在Redis中。
如果发送程序或者任务处理程序挂了重启之后还有重新处理数据的可能性。
做集群扩展相当方便时间准确度高
顾名思义指的是消息的存活时间RabbitMQ可以通过x-message-tt参数来设置指定Queue队列和
Message消息上消息的存活时间它的值是一个非负整数单位为微秒。
设置队列过期时间那么队列中所有消息都具有相同的过期时间。
设置消息过期时间对队列中的某一条消息设置过期时间每条消息TTL都可以不同。
如果同时设置队列和队列中消息的TTL则TTL值以两者中较小的值为准。
而队列中的消息存在队列中的时间一旦超过TTL过期时间则成为Dead
ExchangesDLXDLX即死信交换机绑定在死信交换机上的即死信队列。
RabbitMQ的
Queue队列可以配置两个参数x-dead-letter-exchange
x-dead-letter-routing-key可选一旦队列内出现了Dead
Letter死信则按照这两个参数可以将消息重新路由到另一个Exchange交换机让消息重新被消费。
x-dead-letter-exchange队列中出现Dead
exchange交换机。
x-dead-letter-routing-key指定routing-key发送一般为要指定转发的队列。
消息或者队列的TTL过期队列达到最大长度消息被消费端拒绝basic.reject
我们在下单成功时开启一个延迟消息队列order.delay.queue
并设置x-message-tt消息存活时间为30分钟当到达30分钟后订单消息A0001成为了Dead
Letter死信延迟队列检测到有死信通过配置x-dead-letter-exchange将死信重新转发到能正常消费的关闭订单的消息队列消费端直接监听关闭订单消息队列后检查数据库支付状态如果未支付则关闭该订单。
{amqpTemplate.convertAndSend(order.pay.exchange,
设置延迟毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return
QueueBuilder.durable(RabbitConstant.DEAD_LETTER_QUEUE)//
配置到期后转发的交换机.withArgument(x-dead-letter-exchange,
配置到期后转发的路由键.withArgument(x-dead-letter-routing-key,
高效,可以利用rabbitmq的分布式特性轻易的进行横向扩展,消息支持持久化增加了可靠性。
缺点本身的易用度要依赖于rabbitMq的运维.因为要引用rabbitMq,所以复杂度和成本变高
前边几种延时队列的实现方法相对简单比较容易理解时间轮算法就稍微有点抽象了。
kafka、netty都有基于时间轮算法实现延时队列下边主要实践Netty的延时队列讲一下时间轮是什么原理。
当添加一个定时、延时任务A假如会延迟25秒后才会执行可时间轮一圈round
index也是就任务A会绕一圈指向0格子上此时时间轮会记录该任务的round和
都会指向0格子上而任务则放在每个格子对应的链表中这点和HashMap的数据有些类似。
Netty构建延时队列主要用HashedWheelTimerHashedWheelTimer底层数据结构依然是使用DelayedQueue只是采用时间轮的算法来实现。
简单实现延时队列HashedWheelTimer构造函数比较多解释一下各参数的含义。
tickDuration和unit每格的时间间隔默认100ms
ticksPerWheel一圈下来有几格默认512而如果传入数值的不是2的N次方则会调整为大于等于该参数的一个2的N次方数值有利于优化hash值的计算。
TimerTask一个定时任务的实现接口其中run方法包装了定时任务的逻辑。
Timeout一个定时任务提交到Timer之后返回的句柄通过这个句柄外部可以取消这个定时任务并对定时任务的状态进行一些基本的判断。
Timer是HashedWheelTimer实现的父接口仅定义了如何提交定时任务和如何停止整个定时机制。
HashedWheelTimer(Executors.defaultThreadFactory(),
TimeUnit.SECONDS);//结束时候再次注册}};timer.newTimeout(task1,
TimeUnit.SECONDS);//结束时候再注册}};timer.newTimeout(task2,
TimeUnit.SECONDS);//该任务仅仅运行一次timer.newTimeout(new
从执行的结果看order3、order3延时任务只执行了一次而order2、order1为定时任务按照不同的周期重复执行。
在发送延时消息的时候并不是先投递到要发送的真实主题real_topic中而是先投递到一些
内部的主题delay_topic中这些内部主题对用户不可见然后通过一个自定义的服务拉取这些内部主题中的消息并将满足条件的消息再投递到要发送的真实的主题中消费者所订阅的还是真实的主题。
如果采用这种方案那么一般是按照不同的延时等级来划分的比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级延时的消息按照延时时间投递到不同等级的主题中投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间这样延时误差控制在两个延时等级的时间差范围之内比如延时时间为17s的消息投递到30s的延时主题中之后按照延时时间为30s进行计算延时误差为13s。
虽然有一定的延时误差但是误差可控并且这样只需增加少许的主题就能实现延时队列的功能。
发送到内部主题delay*topic**中的消息会被一个独立的
的消息并转发到真实的主题中。
从消费、暂存再到转发线程之间都是一一对应的关系。
如下图所示DelayService
不会因为未处理的消息过多而导致内存的占用过大DelayService
会对主题中的每个分区进行计数当达到一定的阈值之后就会暂停拉取该分区中的消息。
因为一个主题中一般不止一个分区分区之间的消息并不会按照投递时间进行排序DelayQueue的作用是将消息按照再次投递时间进行有序排序这样下游的消息发送线程就能够按照先后顺序获取最先满足投递条件的消息。
为了DelayQueue排序将消息放到一个分区这样是简化了设计但是丢失了动态的扩展性无法提升单个分区的性能有一定的延时误差无法做到秒级别的精确延时消息在拉取一批消息后如果这批消息未达到延迟时间那么将重新写入主题重新消费但是当消费严重滞后消息堆积时重新写入主题到再消费可能需要已经过了指定时间
乎需要庞大的系统开销就单单文件句柄的使用也会耗费很多的系统资源
相近位置的部分文件即可其余都可以用类似“懒加载”的机制来维持若与时间格对应的文件不存在则
可以新建若与时间格对应的文件未加载则可以重新加载整体上造成的时延相比于延时等级
采用多层时间轮设计到时间轮降级那么会有频繁的文件在磁盘写入和删除很影响性能
可以解决延时进度的问题可以解决内存暴涨的问题方案一:通过给DelayQueue
生产者同样将消息写入多个备份单层文件时间轮〉待时间轮转动而触发某些时间格过期时就可以将时间格
对应的文件内容也就是延时消息〉转发到真实主题中并且删除相应的文件。
与此同时还
等级的实现方案毕竟实现起来简单明了。
反之如果要求高精度或自定义延时时间那么可以选择单层文件时间轮的方案。
写作一点也不比上班干活轻松查证资料反复验证demo的可行性搭建各种RabbitMQ、Redis环境只想说我太难了
目前开源的延迟消息也就pulsar比较可靠其次是rocketmq但rocket只支持18个等级的固定刻度pulsar也是近几天内的任意时间长时间不行。
其他的rabbit不适合大量堆积redis
set在field超过一定量时预计5000左右会产生性能问题需要做二次分片。
jdk的delayqueue性能就不提了只能做短时、量少的延迟消息而且还没有ack可能存在丢失如果要写入数据库也很麻烦。
内存时间轮性能好点但也没法ack而且消费阻塞会引起问题单线程
目前最佳实践就是组合以上的几种定时任务pulsar/rocket本身时间轮等多重组合可以实现海量超长时间的延迟消息
作为下一代云原生分布式消息流平台支持多租户、持久化存储、多机房跨区域数据复制具有强一致性、高吞吐以及低延时的高可扩展流数据存储特性内置诸多其他系统商业版本才有的特性是云原生时代解决实时消息流数据传输、存储和计算的最佳解决方案。
已经应用部署在国内外众多大型互联网公司和传统行业公司案例分布在人工智能、金融、电信运营商、直播与短视频、物联网、零售与电子商务、在线教育等多个行业如美国有线电视网络巨头Comcast、Yahoo、腾讯、中国电信、中国移动、BIGO、VIPKID
Bucket是一组以时间为维度的有序队列用来存放所有需要延迟的已经被reserve的Job这里只存放Job
Id。
Timer负责实时扫描各个Bucket并将delay时间大于等于当前时间的Job放入到对应的Ready
用户对某个商品下单系统创建订单成功同时往延迟队列里put一个job。
job结构为{‘topic’:‘orderclose’,
‘body’:’XXXXXXX’}延迟队列收到该job后先往job
pool中存入job信息然后根据delay计算出绝对执行时间并以轮询(round-robbin)的方式将job
id放入某个bucket。
timer每时每刻都在轮询各个bucket当1800秒30分钟过后检查到上面的job的执行时间到了取得job
pool中获取元信息。
如果这时该job处于deleted状态则pass继续做轮询如果job处于非deleted状态首先再次确认元信息中delay是否大于等于当前时间如果满足则根据topic将job
queue然后从bucket中移除如果不满足则重新计算delay时间再次放入bucket并将之前的job
id从bucket中移除。
消费端轮询对应的topic的ready
queue这里仍然要判断该job的合理性获取job后做自己的业务逻辑。
与此同时服务端将已经被消费端获取的job按照其设定的TTR重新计算执行时间并将其放入bucket。
消费端处理完业务后向服务端响应finish服务端根据job
job的时候采用的是http短轮询的方式且每次只能取的一个job。
如果ready
job较多的时候会加大网络I/O的消耗。
数据存储使用的redis消息在持久化上受限于redis的特性。
scale-out的时候依赖第三方nginx。
官方文档SchedulerX阿里分布式任务调度平台目前公测期免费地址快速访问
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback