96SEO 2026-02-20 01:47 14
在使用RabbitMQ的时候作为消息发送方希望杜绝任何消息丢失或者投递失败场景。

RabbitMQ
confirm确认模式是再producer传递给exchange过程中控制消息的模式当消息成功的从producer传递到了exchange那么则会返回一个
return退回模式是指消息从exchange传递给queue过程中消息传递失败则会返回一个returnCallBack()
因为确认模式是producer到exchange所以代码和配置修改应该写在生产者的模块中。
新版本的rabbitmq弃用了publish-confirmstrue可以改用
1.12.244.105#开启确认模式publisher-confirm-type:
correlated第二步编写confirmCallBack()函数
回调函数confirm()的返回值在发送消息成功时ack为true但是我遇到一个问题就是消息发送成功了在队列中也能看到但是返回值ack为false
这是因为convertAncSend()方法结束后rabbitMQ的资源也就关闭了所以就算成功了回调函数返回值也是false所以我们在后面强制睡眠200ms让资源晚点关闭这样的话得到的ack就是true了
com.rabbitmq.springboot_mqproducer;import
com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
org.springframework.amqp.rabbit.connection.CorrelationData;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.boot.test.context.SpringBootTest;import
javax.annotation.Resource;SpringBootTest
SpringbootMqProducerApplicationTests
{rabbitTemplate.setConfirmCallback(new
RabbitTemplate.ConfirmCallback()
{System.out.println(confirm方法被执行了);System.out.println(b);if(b){System.out.println(消息从producer
exchange成功);System.out.println(失败原因
s);}else{System.out.println(消息从producer
exchange失败);System.out.println(失败原因
s);}}});rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,test.hello,测试springboot整合交换机);Thread.sleep(200);}
1.12.244.105#开启确认模式publisher-confirm-type:
correlated#开启回退模式publisher-returns:
①setMandatory为true如果消息没有到队列queue则返回消息给发送方
②setMandatory为false如果消息没有到队列queue则丢弃消息默认
com.rabbitmq.springboot_mqproducer;import
com.rabbitmq.springboot_mqproducer.rabbitMQconfig.MQConfig;
org.springframework.amqp.core.ReturnedMessage;
org.springframework.amqp.rabbit.connection.CorrelationData;
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.beans.propertyeditors.StringArrayPropertyEditor;
org.springframework.boot.test.context.SpringBootTest;import
javax.annotation.Resource;SpringBootTest
SpringbootMqProducerApplicationTests
{//编写confirm回调函数rabbitTemplate.setConfirmCallback(new
RabbitTemplate.ConfirmCallback()
{System.out.println(confirm方法被执行了);System.out.println(b);if(b){System.out.println(消息从producer
exchange成功);System.out.println(失败原因
s);}else{//消息发送失败需要做一些处理System.out.println(消息从producer
exchange失败);System.out.println(失败原因
s);}}});//编写return回调函数rabbitTemplate.setReturnsCallback(new
RabbitTemplate.ReturnsCallback()
returnedMessage(ReturnedMessage
{System.out.println(return回退模式回调函数执行了);System.out.println(消息returnedMessage.getMessage());System.out.println(exchangereturnedMessage.getExchange());System.out.println(replyCodereturnedMessage.getReplyCode());System.out.println(replyTextreturnedMessage.getReplyText());System.out.println(routingKeyreturnedMessage.getRoutingKey());}});//设置回退模式中exchange处理消息的方式/*当将mandatory设置为false默认值如果RabbitMQ无法将消息路由消息将会被静默丢弃生产者不会收到通知。
当设置mandatory为true时意味着消息被视为mandatory如果在发布消息时RabbitMQ无法将消息路由到任何队列例如由于没有匹配的队列与指定的路由键则代理将通过调用ReturnListener回调的returnedMessage方法将消息返回给生产者发布者。
生产者可以根据需要适当地处理这个返回的消息例如记录日志或执行某些恢复操作。
*/rabbitTemplate.setMandatory(true);//TODO
这里把routingKey写错是为了让交换机找不到queue从而触发returnCallBack()函数rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_NAME,testtttt.hello,测试springboot整合交换机);Thread.sleep(200);}}消息的可靠投递小结
correlated开启确认模式使用rabbitTemplate.setConfirmCallback设置回调函数。
当消息发送到exchange后回调confirm方法。
在方法中判断ack如果为true,
则发送成功如果为false,则发送失败需要处理。
设置ConnectionFactory的publisher-returnstrue开肩退回模式。
使用rabbitTemplate.setReturnCallback设置退回函数当消息从exchange路由到queue失败后如果设置了rabbitTemplate.setMandatory(true)参数则会将消息退回给producer。
并执行回调函数returnedMessage。
在RabbitMQ中也提供了事务机制但是性能较差此处不做讲解。
txSelect(),用于将当前channel设置成transaction模式
ack指Acknowledge,确认。
表示消费端收到消息后的确认方式。
自动确认acknowledge“none”手动确认acknowledge“manual”根据异常情况确认acknowledge“auto”这种方式很麻烦不做讲解
则自动确认收到并将相应message从RabbitMQ的消息缓存中移除。
但是在实际业务处理中很可能消息接收到业务处理出现异常,那么该消息就会丢失。
如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()
手动签收如果出现异常则调用channel.basicNack()
5673#设置消息为手动签收listener:simple:acknowledge-mode:
auto通过抛出异常的类型来做响应的处理concurrency:
消费者端创建一个listener并实现ChannelAwareMessageListener接口其实也可以不实现该接口只要
标记的方法的参数列表有[com.rabbitmq.client.Channel]和[org.springframework.amqp.core.Message]两个参数都可以
com.rabbit.springboot_mqconsumer;import
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.annotation.RabbitListener;
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
org.springframework.stereotype.Component;
com.rabbitmq.client.Channel;/***
{boot_topic_queue})//填写队列名称,可以以字符串数组的方式监听多个队列
System.out.println(messagemessage);
使用ChannelAwareMessageListener监听器接口中的onMessage()方法来充当消费者如果上面注释的方法与当前方法同时存在一条消息只会被消费一次。
不会被两个方法都消费**
2.让监听器类实现ChannelAwareMessageListener接口*
3.如果消息成功处理则调用channel的basicAck()签收*
4.如果消息处理失败则调用channel的basicNack(
)拒绝签收broker重新发送给consumer*/RabbitListener(queues
{try{//1.接收消息System.out.println(message
message);System.out.println(channel
channel);//2.处理业务逻辑System.out.println(模拟处理业务逻辑......);//3.手动签收/*void
IOException;deliveryTag:当消费者接收到一条消息后RabbitMQ
对应的一条消息。
也就是说只确认指定的单个消息已经成功被处理或处理失败。
当
的所有消息都确认处理了。
这种批量确认的机制有助于提高消息的处理效率特别是当消费者处理多条消息时可以通过一次性确认多条消息的方式来减少网络开销和消费者端的负担。
在使用
方法时可以根据实际场景来选择是单条确认还是批量确认以满足不同的业务需求。
*/channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);System.out.println(完成手动签收);}catch(Exception
e){//4.出现异常拒绝签收/*deliveryTag一个唯一标识消息的64位长整型数值用于确认消息的消费状态。
multiple一个布尔类型的参数用于决定是否批量处理多条消息。
若设置为
对应的一条消息。
requeue一个布尔类型的参数表示是否将消息重新放回队列。
若设置为
false则消息会被直接丢弃不会重新放回队列。
*/System.out.println(代码逻辑出现异常拒收);channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);}}
然后我们来测试业务逻辑代码出错的情况我们在业务逻辑代码处添加一个除数不能为0的异常
exchange要持久化queue要持久化message要持久化
2.生产方确认Confirm在后续文章中会讲解如何在回调函数中进行具体的处理
在A系统中每秒最多只能处理1000条请求如果在一秒钟只能瞬间有5000条请求打入A系统那么A系统就会崩溃所以我们在A系统中加入一个MQ中间件让5000个请求先发送到MQ然后A系统再分批次的从MQ中拉取1000条请求这样A系统就避免了崩溃的情况。
确认模式设置为手动确认在上面的Ack我们已经讲过设置prefetch属性prefetch
当设置了消费端限流后如果从MQ中取出1条消息消费者端没有进行确认那么消费者端将不会再从MQ中取消息直到消息被确认。
RabbitMQ可以对消息设置过期时间也可以对整个队列Queue设置过期时间。
生活中我们在购买商品的时候会下订单系统会提示我们要在30分钟之内付款否则订单将会被取消。
③进入交换机exchange_ttl和队列queue_ttl进行绑定
将鼠标放上ttl就可以看到设置的时间等时间一过这条消息就会被自动清除。
com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import
org.springframework.amqp.core.*;
org.springframework.beans.factory.annotation.Qualifier;
org.springframework.context.annotation.Bean;
org.springframework.context.annotation.Configuration;import
创建队列测试ttl特性*/Bean(test_queue_ttl)public
HashMap();arguments.put(x-message-ttl,10000);//消息过期的时间arguments.put(x-expires,100000);//队列过期的时间//设置队列的ttl时间return
QueueBuilder.durable(QUEUE_TTL_NAME).withArguments(arguments).build();//参数的属性可以在控制台上查看}/*
创建一个交换机测试队列ttl特性*/Bean(test_exchange_ttl)public
ExchangeBuilder.topicExchange(EXCHANGE_TTL_NAME).durable(true).build();}/*绑定ttl交换机和队列*/Beanpublic
ttlBinding(Qualifier(test_exchange_ttl)
BindingBuilder.bind(queue).to(exchange).with(ttl.#).noargs();}
}在创建队列时我们指定了x-message-ttl使队列中的所有消息都是一个固定的时间过期
只需要在发送方法convertAndSend()方法中添加一个消息后处理参数即可
中的一个接口用于对消息进行后处理。
通过实现该接口你可以在发送消息之前对消息进行一些自定义处理例如添加自定义的消息头、修改消息内容等。
*/MessagePostProcessor
{//1.设置消息属性message.getMessageProperties().setExpiration(5000);//5000ms过期//2.返回该消息return
{rabbitTemplate.convertAndSend(MQConfig.EXCHANGE_TTL_NAME,
测试ttli,messagePostProcessor);}Thread.sleep(200);}小细节
①当队列设置了x-expires和x-messgae-ttl消息过期时间以短的为准
②当队列设置了x-messgae-ttl且发送消息时通过消息后处理也设置了过期时间那么消息过期时间也以短的为准。
③当十条消息中只有一条消息设置了过期时间这条消息过期后只有处于队列顶端即即将被消费时才会对这条消息是否过期做判断。
message后,可以被重新发送到另一个交换机这个交换机就是DLX。
死信队列为什么英文翻译过来使死信交换机呢因为交换机概念只有在RabbitMQ中才有其它MQ中间件只有队列概念所以习惯叫死信队列而RabbitMQ中存在交换机概念所以叫死信交换机。
队列长度达到限制比如队列最多容纳10条消息当第11条消息进入时这条消息就成为了死信消息。
消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标队列,requeuefalse;原队列存在消息过期设置消息到达超时时间却并未被消费
队列设置参数x-dead-letter-exchange和x-dead-letter-routing-key
x-dead-letter-routing-key消息发送时指定的routingKey
1.声明正常的队列(test_queue_dLx)和交换机(test_exchange_dlx)2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)3.正常队列绑定死信交换机,正常队列绑定死信队列不需要创建Binding
x-dead-letter-routing-key:发送给死信交换机的routingkey
com.rabbitmq.springboot_mqproducer.rabbitMQconfig;import
org.springframework.amqp.core.*;
org.springframework.beans.factory.annotation.Qualifier;
org.springframework.context.annotation.Bean;
org.springframework.context.annotation.Configuration;import
测试死信队列*//*创建普通交换机和普通队列*/Bean(test_exchange_dlx)public
ExchangeBuilder.topicExchange(test_exchange_dlx).durable(true).build();}Bean(test_queue_dlx)public
HashMap();//x-dead-letter-exchange:死信交换机名称map.put(x-dead-letter-exchange,exchange_dlx);//x-dead-letter-routing-key:发送给死信交换机的routingkeymap.put(x-dead-letter-routing-key,dlx.hehe);//这个routingkey只需要满足死信交换机的路由规则就可以//设置正常队列中的消息的过期时间ttlmap.put(x-message-ttl,10000);//设置正常队列的长度限制max-lengthmap.put(x-max_length,10);return
QueueBuilder.durable(test_queue_dlx).withArguments(map).build();}Beanpublic
binding1(Qualifier(test_exchange_dlx)
exchange,Qualifier(test_queue_dlx)Queue
BindingBuilder.bind(queue).to(exchange).with(test.dlx.#).noargs();}/*创建死信交换机和死信队列*/Bean(exchange_dlx)public
ExchangeBuilder.topicExchange(exchange_dlx).durable(true).build();}Bean(queue_dlx)public
QueueBuilder.durable(queue_dlx).build();}Beanpublic
binding2(Qualifier(exchange_dlx)
exchange,Qualifier(queue_dlx)Queue
BindingBuilder.bind(queue).to(exchange).with(dlx.#).noargs();}/*绑定普通队列和死信交换机并不需要写一个Binding只需要在普通队列中添加参数就行*/
rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.hello,测试消息超出过期时间变成死信);//2.超出队列消息数量限制
rabbitTemplate.convertAndSend(test_exchange_dlx,
}//3.消费端拒收rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.hello,测试消息被拒收变成死信);}死信队列小结
2.当消息成为死信后,如果该队列绑定了死信交换机,则消息会被重新路由到死信队列中
消息在队列中到达超时时间并未被消费消息在消费者端被拒收,且设置了不重回队列队列长度存在限制,消息数量超出了限制
延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。
订单系统将订单放入延迟队列种,30分钟后取出,去库存系统中判断订单是否已经支付,再进行后续的支付或者未支付操作
RabbitMQ官方没有提供延迟队列,所以我们需要使用ttl死信队列构成延迟队列
普通队列设置为30min中过期,过期后消息路由到死信队列,库存系统从死信队列中取消息,这样就形成了一个延迟队列
1.定义正常交换机(order_exchange)和队列(order_queue)同时绑定
测试延迟队列*//*1.定义正常交换机(order_exchange)和队列(order_queue)*/Bean(orderQueue)public
orderQueue(){//3.正常队列绑定死信交换机MapString,Object
HashMap();map.put(x-dead-letter-exchange,order_exchange_dlx);map.put(x-dead-letter-routing-key,dlx.order.hehe);//设置正常队列的消息过期时间map.put(x-message-ttl,10000);return
QueueBuilder.durable(order_queue).withArguments(map).build();}Bean(orderExchange)public
ExchangeBuilder.topicExchange(order_exchange).build();}Beanpublic
orderBinding(Qualifier(orderQueue)Queue
queue,Qualifier(orderExchange)Exchange
BindingBuilder.bind(queue).to(exchange).with(order.#).noargs();}/*2.定义死信交换机(order_exchange_dlx)
和队列(order_queue_dlx)*/Bean(orderQueueDlx)public
QueueBuilder.durable(order_queue_dlx).build();}Bean(orderExchangeDlx)public
ExchangeBuilder.topicExchange(order_exchange_dlx).build();}Beanpublic
orderBindingDlx(Qualifier(orderQueueDlx)Queue
queue,Qualifier(orderExchangeDlx)Exchange
BindingBuilder.bind(queue).to(exchange).with(dlx.order.#).noargs();}4.创建生产者发送消息
{rabbitTemplate.convertAndSend(order_exchange,order.test,测试延迟队列);for
0;i--){System.out.println(i...);Thread.sleep(1000);}}5.创建消费者
com.rabbit.springboot_mqconsumer;import
org.springframework.amqp.core.Message;
org.springframework.amqp.rabbit.annotation.RabbitListener;
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
org.springframework.stereotype.Component;/***
order_queue_dlx)//监听死信队列Overridepublic
{try{//1.接收messageSystem.out.println(message:message);//2.处理业务逻辑System.out.println(处理业务逻辑);System.out.println(根据订单id在数据库中查询订单状态);System.out.println(判断订单是否支付成功);System.out.println(未支付回滚库存取消订单);//3.手动签收channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch
e){//4.业务出错拒绝签收channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,true);//业务出错拒签后要将这条消息重新放回死信队列}}
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback