96SEO 2026-02-19 21:33 16
。

一般用来解决应用解耦#xff0c;异步消息#xff0c;流量削峰等问题#xff0c;实现高性能#xff0c;高可用#xf…RabbitMq
mq就是消息队列消息队列遵循这先入先出原则。
一般用来解决应用解耦异步消息流量削峰等问题实现高性能高可用可伸缩和最终一致性架构。
Erlang语言编写即需要先安装部署Erlang环境再安装RabbitMQ环境。
erlang-21.3-1.el7.x86_64.rpm#安装socat
rabbitmq-server-3.8.8-1.el7.noarch.rpm
xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.chen/groupIdartifactIdmq/artifactIdversion1.0-SNAPSHOT/versionpropertiesrabbitmq.version5.8.0/rabbitmq.versioncommon.version2.6/common.version/propertiesdependencies
https://mvnrepository.com/artifact/com.rabbitmq/amqp-client
--dependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion${rabbitmq.version}/version/dependencydependencygroupIdcommons-io/groupIdartifactIdcommons-io/artifactIdversion${common.version}/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdconfigurationsource8/sourcetarget8/target/configuration/plugin/plugins/build
com.rabbitmq.client.Connection;
com.rabbitmq.client.ConnectionFactory;/***
连接地址ipfactory.setHost(172.17.18.162);
密码factory.setPassword(123456);//
参数五其他参数channel.queueDeclare(MQ_KEY,true,false,false,null);/***
4.发送消息的消息体*/channel.basicPublish(,MQ_KEY,null,holle
word.getBytes());System.out.println(消息发送成功);}}
ConnectionFactory();factory.setHost(172.17.18.162);factory.setUsername(admin);factory.setPassword(123456);
factory.newConnection();Channel
connection.createChannel();/*参数1:
手动3.消费者未成功的回调4.消费者取录成功的回调*/channel.basicConsume(MQ_KEY,
String(message.getBody())),(CancelCallback)
System.out.println(consumerTag));}}
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务而不得不等待它完成。
相反我们安排任务在之后执行。
我们把任务封装为消息并将其发送到队列。
在后台运行的工作进程将弹出任务并最终执行作业。
当有多个工作线程时这些工作线程将一起处理这些任务。
com.chen.rabbitmq.tow.utils;import
com.rabbitmq.client.Connection;
com.rabbitmq.client.ConnectionFactory;public
ConnectionFactory();factory.setHost(172.17.18.162);factory.setUsername(admin);factory.setPassword(123456);
factory.newConnection();Channel
connection.createChannel();return
com.chen.rabbitmq.tow.test;import
com.chen.rabbitmq.tow.utils.RabbitUtils;
com.rabbitmq.client.Channel;import
RabbitUtils.rabbitConnection();Scanner
Scanner(System.in);//生产队列channel.queueDeclare(MQ_KEY,true,false,false,null);while
scanner.next();channel.basicPublish(,MQ_KEY,null,next.getBytes());System.out.println(消息发布成功-
com.chen.rabbitmq.tow.test;import
com.chen.rabbitmq.tow.utils.RabbitUtils;
com.rabbitmq.client.CancelCallback;
com.rabbitmq.client.DeliverCallback;public
RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,true,(DeliverCallback)(consumerTag,message)-{System.out.println(new
String(message.getBody()));},(CancelCallback)(tag)-{System.out.println(tag);System.out.println(中断了);});}public
AcknowledgementAuto-ack是一种消息确认机制用于标记消息是否已被成功接收和处理。
了解自动应答的概念对于构建可靠、高效的消息传递系统非常重要。
的消息时通常会使用消息确认acknowledgements机制来告知
就可以确保消息不会意外丢失。
然而这种确认过程可能会导致一定的延迟和额外开销。
为了解决这个问题RabbitMQ
会立即将该消息标记为已处理。
这意味着消费者不需要显式地发送确认ack消息给
RabbitMQ。
这种机制可以降低延迟提高消息传递的速度但是也存在一定的风险。
因为消息一旦被发送出去RabbitMQ
就认为它已经成功处理而实际上消费者可能还没有完成对消息的处理。
如果消费者在处理消息时发生故障那么这个消息可能会丢失。
{this.delegate.basicAck(deliveryTag,
com.chen.rabbitmq.tow.utils.RabbitUtils;
java.nio.charset.StandardCharsets;
RabbitUtils.rabbitConnection();channel.queueDeclare(MQ_KEY,true,false,false,null);Scanner
scanner.next();channel.basicPublish(,MQ_KEY,null,scanner.next().getBytes());System.out.println(消息发布成功-
com.chen.rabbitmq.tow.utils.RabbitUtils;
com.rabbitmq.client.CancelCallback;
com.rabbitmq.client.DeliverCallback;//消费者1
RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,false,(DeliverCallback)
{Thread.sleep(1*1000);System.out.println(Word1接收到消息-new
参数二是否批量channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}
{e.printStackTrace();}},(CancelCallback)
com.chen.rabbitmq.tow.utils.RabbitUtils;
com.rabbitmq.client.CancelCallback;
com.rabbitmq.client.DeliverCallback;//消费者1
RabbitUtils.rabbitConnection();channel.basicConsume(MQ_KEY,false,(DeliverCallback)
{Thread.sleep(10*1000);System.out.println(Word2接收到消息-new
参数二是否批量channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}
{e.printStackTrace();}},(CancelCallback)
经测试会发现消费者1为第一个接收到消息接下来当生产者在生产出一条消息应到消费者2接收到消息但是此时消费者2突然出现宕机使用了应答机制消息则会重新打到消费者1
channel.queueDeclare(MQ_KEY,true,false,false,null);
RabbitUtils.rabbitConnection();channel.queueDeclare(MQ_KEY,true,false,false,null);Scanner
//MessageProperties.PERSISTENT_TEXT_PLAIN
消息持久化channel.basicPublish(,MQ_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());System.out.println(消息发布成功-
RabbitUtils.rabbitConnection();
channel.confirmSelect();//开启发布确认
这个发布确认是同步的需等待确认一次在发布下一次一手交钱一手交货原则
RabbitUtils.rabbitConnection();//开启发布确认channel.confirmSelect();String
UUID.randomUUID().toString();//创建队列channel.queueDeclare(uuid,true,false,false,null);//开始时间long
;channel.basicPublish(,uuid,null,message.getBytes());//发布确认boolean
channel.waitForConfirms();if(flag){System.out.println(消息确认成功);}}long
System.currentTimeMillis();System.out.println(耗时(last-begin));}
发布速度相对单个发布确认要快但是当其中一条消息出现异常将无法查找到那个消息丢失
RabbitUtils.rabbitConnection();String
UUID.randomUUID().toString();//开启消息确认channel.confirmSelect();//创建队列channel.queueDeclare(uuid,true,false,false,null);//这个这个变量用记录发布值Integer
messagei;//发布消息channel.basicPublish(,uuid,null,message.getBytes());if(messageCount.equals(record)){channel.waitForConfirms();record0;}}long
System.currentTimeMillis();System.out.println(耗时(last-begin));}
异步确认虽然比上的两个代码复杂但同时也解决了上面两种方式遗留下来的问题。
RabbitUtils.rabbitConnection();//开启发布确认channel.confirmSelect();String
UUID.randomUUID().toString();//创建队列channel.queueDeclare(uuid,true,false,false,null);//开始时间long
----》处理异步未确认的消息ConcurrentSkipListMapLong,
监听消息channel.addConfirmListener((deliveryTag,
multiple)-{if(multiple){ConcurrentNavigableMapLong,
longStringConcurrentNavigableMap
map.headMap(deliveryTag);longStringConcurrentNavigableMap.clear();}else{map.remove(deliveryTag);}System.out.println(确认消息deliveryTag);},(deliveryTag,
map.get(deliveryTag);System.out.println(发送失败的数据是message未确认消息deliveryTag-----失败);});for
messagei;channel.basicPublish(,uuid,null,message.getBytes());//获取信道的标识存入消息map.put(channel.getNextPublishSeqNo(),message);}long
System.currentTimeMillis();System.out.println(耗时(last-begin));}
在上面中的所有例子都是尊寻这轮询的规则去执行的问题:当其中的一台服务响应特别慢时就会影响到整体的效率。
RabbitUtils.rabbitConnection();//设置不公平分发channel.basicQos(1);channel.basicConsume(MQ_KEY,false,(DeliverCallback)
{//模拟虚拟机延迟Thread.sleep(1*1000);System.out.println(Word2接收到消息-new
String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}
{e.printStackTrace();}},(CancelCallback)
RabbitUtils.rabbitConnection();//设置预期值channel.basicQos(3);channel.basicConsume(MQ_KEY,false,(DeliverCallback)
{//模拟虚拟机延迟Thread.sleep(1*1000);System.out.println(Word2接收到消息-new
String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}
{e.printStackTrace();}},(CancelCallback)
RabbitUtils.rabbitConnection();//设置预期值channel.basicQos(5);
channel.basicConsume(MQ_KEY,false,(DeliverCallback)
{//模拟虚拟机延迟Thread.sleep(10*1000);System.out.println(Word2接收到消息-new
String(message.getBody()));channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}
{e.printStackTrace();}},(CancelCallback)
在RabbitMQ中生产者发送消息不会直接将消息投递到队列中而是先将消息投递到交换机中
与交换机产生关系并且能有routekey控制发送消息给哪个队列。
扇形交换机是最基本的交换机类型它所能做的事清非常简单广播消息。
扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。
因为广播不需要思考”所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
RabbitUtils.rabbitConnection();
声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME,fanout);
channel.queueDeclare().getQueue();
绑定交换机与队列channel.queueBind(queue,EXCHANGE_NAME,);System.out.println(等待消息~);//消费者取消消息时回调接口channel.basicConsume(queue,true,
(consumerTag,message)-{System.out.println(word1控制台打印接收消息:new
String(message.getBody(),UTF-8));},cancelCallback-{});}
RabbitUtils.rabbitConnection();
声明一个交换机channel.exchangeDeclare(EXCHANGE_NAME,fanout);
channel.queueDeclare().getQueue();
绑定交换机与队列channel.queueBind(queue,EXCHANGE_NAME,);System.out.println(等待消息~);
//消费者取消消息时回调接口channel.basicConsume(queue,true,
(consumerTag,message)-{System.out.println(word2控制台打印接收消息:new
String(message.getBody(),UTF-8));},cancelCallback-{});}
RabbitUtils.rabbitConnection();Scanner
scanner.next();channel.basicPublish(EXCHANGE_NAME,,null,next.getBytes(UTF-8));System.out.println(生产者发送消息next);}}
代码几乎类型fanout交换机只需要指定routerkey即可。
必须是由点号分开的一串单词这些单词可以是任意的但通常是与消息相关的一些特征。
发送routerkey为ws.orange.rabbit那么对应的就是Q1Q2
发送routerkey为lazy.orange.elephant那么对应的就是Q1Q2
EXCHANGE_NAMEtopic_logs;private
RabbitUtils.rabbitConnection();
创建交换机channel.exchangeDeclare(EXCHANGE_NAME,
创建队列channel.queueDeclare(QUEUE_NAME,true,true,false,null);
绑定队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,*.orange.*);
接收消息channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)-{System.out.println(接收到的消息new
String(message.getBody()));},cancelCallback-{});System.out.println(等下消息~);}
EXCHANGE_NAMEtopic_logs;private
RabbitUtils.rabbitConnection();
创建交换机channel.exchangeDeclare(EXCHANGE_NAME,
创建队列channel.queueDeclare(QUEUE_NAME,true,true,false,null);
绑定队列channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,*.*.rabbit);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,lazy.#);
接收消息channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)-{System.out.println(接收到的消息new
String(message.getBody()));},cancelCallback-{});System.out.println(等下消息~);}
RabbitUtils.rabbitConnection();Scanner
(true){System.out.println(请输入routerkey);String
scanner.next();System.out.println(请输入消息内容);String
scanner.next();channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());}}
顾名思义无法被消费的消息一般来说producer将消息投递broker或者直接到queue里了consumer(消费者)从queue取出消息进行消费但某些时间由特定原因导致queue中的某些消息无法被消费这样如果没有后续的处理就变成了死信。
应用场景为了确保订单业务的消息数据不丢失需要使用到RabbitMQ的死信队列机制当消息被消息时发生了异常这是就将消息存到死信中还比如说用户商城下单成功并且点击支付后在指定时间支付时自动失效。
NORMAL_EXCHANGEnormal_exchange;public
NORMAL_QUEUEnormal_queue;public
RabbitUtils.rabbitConnection();
AMQP.BasicProperties().builder().expiration(10000).build();for
msginfoi;channel.basicPublish(NORMAL_EXCHANGE,zhangsan,basicProperties,msg.getBytes());}}
NORMAL_EXCHANGEnormal_exchange;private
DEAD_EXCHANGEdead_exchange;public
NORMAL_QUEUEnormal_queue;public
RabbitUtils.rabbitConnection();
创建c1交换机channel.exchangeDeclare(NORMAL_EXCHANGE,
BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,
正常队列设置死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE);
设置死信的routerKeymap.put(x-dead-letter-routing-key,lisi);//
创建普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//创建死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//
绑定channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);channel.basicConsume(NORMAL_QUEUE,true,(consumerTag,
{System.out.println(C1消息为message.getBody());},cancelCallback-{});}
DEAD_EXCHANGEdead_exchange;public
RabbitUtils.rabbitConnection();channel.basicConsume(DEAD_QUEUE,true,(consumerTag,message)-{System.out.println(消息为new
String(message.getBody()));},cancelCallback-{});}
NORMAL_EXCHANGEnormal_exchange;private
DEAD_EXCHANGEdead_exchange;public
NORMAL_QUEUEnormal_queue;public
RabbitUtils.rabbitConnection();
创建c1交换机channel.exchangeDeclare(NORMAL_EXCHANGE,
BuiltinExchangeType.DIRECT);channel.exchangeDeclare(DEAD_EXCHANGE,
正常队列设置死信交换机map.put(x-dead-letter-exchange,DEAD_EXCHANGE);
设置死信的routerKeymap.put(x-dead-letter-routing-key,lisi);
创建普通队列channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);//创建死信队列channel.queueDeclare(DEAD_QUEUE,false,false,false,null);//
绑定channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,zhangsan);channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,lisi);channel.basicConsume(NORMAL_QUEUE,false,(consumerTag,
String(message.getBody());System.out.println(C1消息为msg);
deliveryTagchannel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}},cancelCallback-{});}
RabbitMQ的集成还提供了一套高度可配置的模板类来生产、消费消息并管理AMQP基础设施组件如队(Queue)、交换机(Exchange)和绑定(Binding)。
!--AMQP依赖包含RabbitMQ--dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependency
org.springframework.amqp.rabbit.core.RabbitTemplate;
org.springframework.boot.test.context.SpringBootTest;
org.springframework.test.context.junit4.SpringRunner;
Amqp!;rabbitTemplate.convertAndSend(simple.queue,
org.springframework.amqp.rabbit.annotation.RabbitListener;
org.springframework.stereotype.Component;
listenSimpleQueueMessage(String
{System.out.println(spring接收到的消息是
{rabbitTemplate.convertAndSend(key,
listenSimpleQueueMessage(String
{System.out.println(spring接收到的消息是
LocalDateTime.now());Thread.sleep(20);}RabbitListener(queues
{System.err.println(FanoutQueue1接收到的消息是
LocalDateTime.now());Thread.sleep(200);
通过执行结果我们可以看出listenFanoutQueue1这个监听器执行的是奇数而listenSimpleQueueMessage则是偶数。
且时间超出了1秒。
为什么呢
因为在生产者发送到队列中时消费者会预取消息在默认情况下进行平分机制在上面代码中我们可以看到我们使用了线程睡眠的方式模拟了性能在平分的情况下睡眠200的执行了25条所以导致了超出了1s。
1。
这意味着每次只会从队列中取出一条消息进行处理处理完后再去取下一条消息。
这种方式可以保证消息的顺序处理。
这种交换机需要进行绑定对应的队列绑定对应的队列后生产者将消息推送给交换机交换机会将消息分别都发给绑定的消息队列。
org.springframework.amqp.core.Binding;
org.springframework.amqp.core.BindingBuilder;
org.springframework.amqp.core.FanoutExchange;
org.springframework.amqp.core.Queue;
org.springframework.context.annotation.Bean;
org.springframework.context.annotation.Configuration;
FanoutExchange(fanoutExchange);}
BindingBuilder.bind(queue1()).to(fanoutExchange());}//
BindingBuilder.bind(queue2()).to(fanoutExchange());}
listenFanoutQueue1QueueMessage(String
{System.out.println(fanout.queue1接收到的消息是
LocalDateTime.now());}RabbitListener(queues
listenFanoutQueue2QueueMessage(String
{System.out.println(fanout.queue2接收到的消息是
fanoutExchange;rabbitTemplate.convertAndSend(exchange,,message);}
这种交换机需要指定一个key进行发送通过可以区别发送到那个队列同时这些队列也可以绑定相同的key那么也就是实现了fanout的效果。
可以通过bena的方式进注入这里我们采用RabbitListenner的方式RabbitListener(bindings
ExchangeTypes.DIRECT),//绑定的交换机key
{System.out.println(listenDirectQueue1接收到的消息是
ExchangeTypes.DIRECT),//绑定的交换机key
{System.out.println(listenDirectQueue2接收到的消息是
direct.exchange;rabbitTemplate.convertAndSend(exchange,routingKey,message);}
这种交换机其实和direct类型的交换机差不错只不过它是使用通配符的方式。
{System.out.println(topic.queue1接收到消息
{System.out.println(topic.queue2接收到消息
topic.exchange;rabbitTemplate.convertAndSend(exchange,routingKey,message);}
Queue(obj.queue);}//发送消息Testpublic
HashMap();map.put(name,test);map.put(age,18);rabbitTemplate.convertAndSend(obj.queue,map);}
我们重rabbitmq的ui界面中我们可以发现消息是基于JDK完成的序列化。
缺点这样不能很直接的看出消息的结果并且占用大量内存所以下面我们使用jdckson进行json序列化。
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId
org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
org.springframework.amqp.support.converter.MessageConverter;Beanpublic
Jackson2JsonMessageConverter();}
dependencygroupIdcom.fasterxml.jackson.dataformat/groupIdartifactIdjackson-dataformat-xml/artifactId
org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
org.springframework.amqp.support.converter.MessageConverter;Beanpublic
Jackson2JsonMessageConverter();}RabbitListener(queues
{System.out.println(obj.queue接收到的消息是
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback