2026-02-20 04:30 5
前面已经实现了虚拟主机大部分功能以及转发规则的判定也就是说现在消息已经可以通过

那么接下来要解决的问题就是消费者该如何订阅消息队列如何把消息推送给消费者以及消费者如何描述自己怎么执行任务~
消费者是以队列为维度订阅消息的并且一个队列可以被多个消费者订阅那么一旦队列中有消息这个消息到底因该给谁呢此处就约定消费者之间按照
这里我们就需要定义一个类ConsumerEnv用来描述一个消费者如下
autoAck;//通过这个回调来处理收到的消息private
List用来包含若干个上述消费者有哪些消费者订阅了当前队列如下图
ArrayList();//记录当取到了第几个消费者(AtomicInteger
{consumerEnvList.add(consumerEnv);}/***
consumerEnvList.size();consumerSeq.getAndIncrement();//
consumerEnvList.get(index);}VirtualHost
添加一个队列的订阅者当队列收到消息之后就要把消息推送给对应的订阅者*
{consumerManager.addConsumer(consumerTag,
consumer);System.out.println([VirtualHost]
{e.printStackTrace();System.out.println([VirtualHost]
false;}}1.2.2、消费者描述自己执行任务方式实现思路
当执行订阅消息的时候我们就让消费者自己去实现处理消息的操作消息的内容通过参数传递具体要干啥取决于消费者自己的业务路基最后再让线程池来执行回调函数.
表达式让消费者在订阅消息的时候就可以实现未来收到消息后如何去处理消息的操作.
如果就一个扫描线程既要获取消息又要执行回调这一个线程可能会忙不过来因为消费者给出的回调具体干什么的咱们是不知道的.
一个简单粗暴的办法就是直接让扫描线程不停的循环遍历所有队列发现有元素就立即处理。
另一个更优雅的办法我采取的办法就是用一个阻塞队列队列中的元素就是接收消息的队列的名字扫描线程只需要盯住这一个阻塞对垒即可此时阻塞队列中传递的队列名就相当于
Executors.newFixedThreadPool(4);//存放令牌的队列private
LinkedBlockingQueue();//扫描线程private
parent;//创建扫描线程取队列中消费消息scannerThread
tokenQueue.take();//2.根据令牌找到队列MSGQueue
parent.getMemoryDataCenter().getQueue(queueName);if(queue
queueName);}//3.从这个队列中消费一个消息synchronized
RuntimeException(e);}}});//设置为后台线程scannerThread.setDaemon(true);scannerThread.start();}public
{tokenQueue.put(queueName);}/***
parent.getMemoryDataCenter().getQueue(queueName);if(queue
{queue.addConsumerEnv(consumerEnv);//如果当前队列中已经有一些消息了需要立即消费掉int
parent.getMemoryDataCenter().getMessageCount(queueName);for(int
{//这个方法调用一次就消费一条消息consumeMessage(queue);}}}/***
queue.chooseConsumer();if(luckDog
{//当前队列中没有消费者暂时不用消费等后面有消费者了再说return;}//2.从队列中取出一个消息Message
parent.getMemoryDataCenter().pollMessage(queue.getName());if(message
{//当前队列中还没有消息也不需要消费return;}//3.把消息带入到消费者的回调方法中丢给线程池执行workerPool.submit(()
{//1.把消息放到待确认的集合当中这个操作一定要在执行回调之前(防止执行回调过程中出现异常导致消息丢失)parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(),
message);//2.真正执行回调操作luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(),
message.getBasicProperties(),message.getBody());//3.如果当前是
删除硬盘上的消息if(message.getDeliverMode()
{parent.getDiskDataCenter().deleteMessage(queue,
删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),
删除内存上的消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager]
正确消费就是指消费者的回调方法顺利执行完了没有抛异常之类的这条消息的使命就完成了此时就可以删除了。
autoAckfalse手动应答需要消费者这边在自己的回调方法内部显式调用
memoryDataCenter.getQueue(queueName);if(queue
memoryDataCenter.getMessage(messageId);if(message
messageId);}//2.各个维度删除消息if(message.getDeliverMode()
{diskDataCenter.deleteMessage(queue,
message);}memoryDataCenter.removeMessage(messageId);memoryDataCenter.removeMessageWaitAck(queueName,
messageId);System.out.println([VirtualHost]
{System.out.println([VirtualHost]
messageId);e.printStackTrace();return
queue.chooseConsumer();if(luckDog
{//当前队列中没有消费者暂时不用消费等后面有消费者了再说return;}//2.从队列中取出一个消息Message
parent.getMemoryDataCenter().pollMessage(queue.getName());if(message
{//当前队列中还没有消息也不需要消费return;}//3.把消息带入到消费者的回调方法中丢给线程池执行workerPool.submit(()
{//1.把消息放到待确认的集合当中这个操作一定要在执行回调之前(防止执行回调过程中出现异常导致消息丢失)parent.getMemoryDataCenter().addMessageWaitAck(luckDog.getQueueName(),
message);//2.真正执行回调操作luckDog.getConsumer().handlerDelivery(luckDog.getConsumerTag(),
message.getBasicProperties(),message.getBody());//3.如果当前是
删除硬盘上的消息if(message.getDeliverMode()
{parent.getDiskDataCenter().deleteMessage(queue,
删除上面的待确认集合中的消息parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(),
删除内存上的消息中心的消息parent.getMemoryDataCenter().removeMessage(message.getMessageId());System.out.println([ConsumerManager]
{e.printStackTrace();}});}如果在回调方法中抛异常了
回调方法中抛异常了后续逻辑执行不到这个消息就会始终呆在待确认的集合中
中不叫线程人家是叫进程但是注意这个进程不是操作系统中的进程而是
待确认集合中每个消息待了多久了如果超出了一定的时间范围就会把这个消息放到一个特定的队列
重启之后这个消息就又被加载回内存了就像从来没有被消费过一样消费者就又机会重新拿到这个消息重新消费重复消费的问题是由消费者的业务代码负责保证的broker
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback