96SEO 2026-02-20 04:36 14
ack使用默认的all开启重试在一定时间内重试不成功则入库后续由定时任务继续发送这里在某些异常情况下一定会生产重复消息如何确保消息只消费一次后续在Consumer实现中详细展开这里我们只要确保生产的消息不论重试多少次最终都只会被发送到同一分区。

Kafka的确定消息的分区策略是
如果提供了key则根据hash(key)计算分区。
由于我们每个消息都有一个消息ID不管是重试多少次ID是不会变的同时我们不会在消息高峰阶段调整分区数量。
所以基于这些我们保证一个消息无论多少次都会发送到同一分区。
objectMapper.writeValueAsBytes(userDTO);}
bootstrap.server的IP地址和docker-compose.yml中的EXTERNAL保持一致*
Properties();result.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
l192.168.0.102:9093);result.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
org.apache.kafka.common.serialization.StringSerializer);result.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializer);result.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
bytes太小了这会导致邮件消息一个一个发送到kafka达不到批量发送的目的不符合发送邮件的场景result.put(ProducerConfig.BATCH_SIZE_CONFIG,
bytes限制的是一个batch的大小对于20KB的消息来说消息太小result.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG,
为了让更多的消息聚合到一个batch中提高吞吐量result.put(ProducerConfig.LINGER_MS_CONFIG,
在实际场景中我们的邮件消息一个大概20KB而batch.size默认是16KB也就是说在不修改该参数的情况下生产者只能一个一个的发消息这会导致我们的吞吐量上不去
限制了单次请求的大小默认为1MB也就是说即使batch.size为10MB但是由于一次只能最多发1MB吞吐量也上不去所以这里将max.request.size也改为10MB由于我们将一个批次可发送的数量大大提高所以可以让生产者等一会再发等更多的数据到达。
linger.ms默认是为0也就是立刻发送根据实际情况适当增加等待时间
KafkaProducer(KafkaConfiguration.loadProducerConfig(UserDTOSerializer.class.getName()));private
官方建议使用delivery.timeout.ms默认2分钟*
如果你想在本地测试Kafka生产者的重试详情可以看https://lists.apache.org/thread/nwg326bxpo7ry116nqhxmsmc3bokc6hm*
MessageFailedEntity();messageFailedEntity.setMessageId(userDTO.getMessageId());ObjectMapper
{messageFailedEntity.setMessageContentJsonFormat(mapper.writeValueAsString(userDTO));}
failed);}messageFailedEntity.setMessageType(MessageType.EMAIL);messageFailedEntity.setMessageFailedPhrase(MessageFailedPhrase.PRODUCER);messageFailedEntity.setFailedReason(e.getMessage());//
如果sendMessage传进来的是个list也同理不能放到list.foreach外面//
producer的执行速度可能慢于主线程可能拿到的值是空的是有问题的例如拿到的failedReason是空的messageFailedService.saveOrUpdateMessageFailed(messageFailedEntity);}
我们使用异步的方式发送如果发送成功打印一条消息关键在于重试callback函数只有在最后一次重试之后才会调用。
不会重试多少次就调用多少次callback
{MESSAGE_IDS.remove(messageId);}public
MESSAGE_IDS.containsKey(messageId);}//
contextInitialized(ServletContextEvent
{KAFKA_PRODUCERS.add(MessageProducer.PRODUCER);}Overridepublic
contextDestroyed(ServletContextEvent
{KAFKA_PRODUCERS.forEach(KafkaProducer::close);}
xmlnshttps://jakarta.ee/xml/ns/jakartaeexmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttps://jakarta.ee/xml/ns/jakartaeehttps://jakarta.ee/xml/ns/jakartaee/web-app_6_0.xsdversion6.0listenerlistener-classcom.business.server.listener.KafkaListener/listener-class/listener
在实际编码过程中可以参考官方写的Kafka权威指南对应章节书写或者参考各大云服务厂商的Kafak的开发者文档。
不过我建议还是看Kafka权威指南
我看了阿里云和华为云的虽然都号称兼容开源Kafka但是发现其版本和开源版本之间存在一定的滞后性许多最佳实践已经过时Kafka生产者端没什么特别的主要是根据业务场景设计消息格式以及如何尽可能的减小消息体积如果你的消息很大比我的场景还大达到了1M以上生产者的吞吐量是个问题消费者的消费速度也是个问题。
你要是问我有什么好的想法没有具体场景我确实想不出什么好的方式
http://localhost:8999/business-server
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback