96SEO 2026-02-19 16:31 0
的生产和消费功能在公司内部微服务项目中大量使用了该框架以下是个人的一些记录。

是一个分布式流处理平台广泛用于构建实时数据管道和流应用程序。
它具有以下几个关键功能
发布和订阅记录流类似于消息队列或企业消息传递系统。
存储记录流存储数据流容错。
处理记录流实时或批处理方式处理数据流。
KafkaListener用于消费消息的注解支持自动并发和分区分配。
KafkaMessageListenerContainer低级别的消息监听容器提供了更大的灵活性。
作为微服务之间的消息传递中介。
实时数据处理处理流式数据如日志收集、监控和分析。
事件溯源使用Kafka
记录所有事件支持事件溯源和审计。
数据集成连接不同的数据源和目标系统实现数据集成和同步。
dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency2.
jdbc:mysql://localhost:3306/testusername:
开启事务管理transaction:management:enabled:
消费监听接口监听的主题不存在时默认会报错listener:missing-topics-fatal:
当retris为0时produce不会重复。
retirs重发此时repli节点完全成为leader节点不会产生消息丢失。
retries:
0#procedure要求leader在考虑完成请求之前收到的确认数用于控制发送记录在服务端的持久化其值可以为如下#acks
如果设置为零则生产者将不会等待来自服务器的任何确认该记录将立即添加到套接字缓冲区并视为已发送。
在这种情况下无法保证服务器已收到记录并且重试配置将不会生效因为客户端通常不会知道任何故障为每条记录返回的偏移量始终设置为-1。
#acks
这意味着leader会将记录写入其本地日志但无需等待所有副本服务器的完全确认即可做出回应在这种情况下如果leader在确认记录后立即失败但在将数据复制到所有的副本服务器之前则记录将会丢失。
#acks
这意味着leader将等待完整的同步副本集以确认记录这保证了只要至少一个同步副本服务器仍然存活记录就不会丢失这是最强有力的保证这相当于acks
指定消息key和消息体的编解码方式key-serializer:
org.apache.kafka.common.serialization.StringSerializervalue-serializer:
org.apache.kafka.common.serialization.StringSerializer#
每次批量发送消息的数量,produce积累到一定数据一次发送#batch-size:
produce积累数据一次发送缓存大小达到buffer.memory就发送数据#buffer-memory:
由于在kafka中同一组中的consumer不会读取到同一个消息依靠groud.id设置组名group-id:
smallest和largest才有效如果smallest重新0开始读取如果是largest从logfile的offset读取。
一般情况下我们都是设置smallestauto-offset-reset:
设置自动提交offsetenable-auto-commit:
true#如果enable.auto.commit为true则消费者偏移自动提交给Kafka的频率以毫秒为单位默认值为5000。
auto-commit-interval:
指定消息key和消息体的编解码方式key-deserializer:
org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:
org.apache.kafka.common.serialization.StringDeserializerproperties:spring.json.value.default.type:
创建一个Kafka消费者工厂用于生产特定配置的Kafka消费者。
*
ConcurrentKafkaListenerContainerFactoryString,
kafkaListenerContainerFactory(ConsumerFactoryString,
{ConcurrentKafkaListenerContainerFactoryString,
ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory);log.info(KafkaListenerContainerFactory
创建一个Kafka消费者工厂用于生产特定配置的Kafka消费者。
*
这个方法的主要作用是配置消费者的各项参数包括连接服务器、消费者组ID、序列化器等*
返回一个配置好的消费者工厂用于创建字符串键和Event值的消费者实例。
*/Beanpublic
初始化配置属性映射用于设置消费者的配置参数。
MapString,
配置Kafka服务器的连接地址和端口。
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
配置消费者的组ID用于标识消费者属于哪个消费组。
configProps.put(ConsumerConfig.GROUP_ID_CONFIG,
配置键和值的反序列化器这里使用StringDeserializer和JsonDeserializer。
//
StringDeserializer用于反序列化键JsonDeserializer用于反序列化值并且信任所有包设置默认值类型为Event类。
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);configProps.put(JsonDeserializer.TRUSTED_PACKAGES,
*);configProps.put(JsonDeserializer.VALUE_DEFAULT_TYPE,
使用配置属性映射创建并返回一个默认的Kafka消费者工厂。
return
DefaultKafkaConsumerFactory(configProps);}
topic;Value(${kafka.group-2})public
topic,消费者组这里消费者组可以覆盖消费者工厂中配置的消费者组**
{System.out.println([event-listener]
以上不同的写法都可以根据需要可以直接写消费者相关信息也可以通过配置文件变量等方式获取选择适合自己的即可。
一般用配置变量的方式。
springboot项目然后发送消息即可查看消息是否已经消费到了。
来控制监听器是否在启动的时候开始进行消费监听当然这种方式不够灵活我们可以写一个接口在需要的时候进行启动和暂停以下是个示例。
com.tan.kafka.service.KafkaListenerControlService;
org.springframework.beans.factory.annotation.Autowired;
org.springframework.web.bind.annotation.PostMapping;
org.springframework.web.bind.annotation.RequestMapping;
org.springframework.web.bind.annotation.RestController;/***
kafkaListenerControlService;PostMapping(/pause)public
{kafkaListenerControlService.pauseListener();return
Paused;}PostMapping(/resume)public
{kafkaListenerControlService.resumeListener();return
kafkaListenerEndpointRegistry;PostConstructpublic
{log.info(KafkaListenerControlService.checkListeners);for
kafkaListenerEndpointRegistry.getListenerContainerIds())
kafkaListenerEndpointRegistry.getListenerContainer(listenerId);if
本方法旨在提供一种方式来暂停应用程序中特定的Kafka监听器。
这可能在需要临时停止处理新消息*
KafkaListenerEndpointRegistry#getListenerContainer(String)
MessageListenerContainer#pause()
通过监听器端点注册表获取名为eventListener的监听器容器MessageListenerContainer
kafkaListenerEndpointRegistry.getListenerContainer(eventListener);//
{container.pause();log.info(Paused
container.getListenerId());}}/***
它首先从注册表中获取名为eventListener的监听器容器*
然后检查容器是否为空。
如果容器存在则调用其resume方法来恢复监听器的运行。
*
KafkaListenerEndpointRegistry#getListenerContainer(String)*
MessageListenerContainer#resume()*/public
从注册表中获取名为eventListener的监听器容器MessageListenerContainer
kafkaListenerEndpointRegistry.getListenerContainer(eventListener);//
记录恢复操作的信息包括监听器的IDlog.info(Resumed
http://localhost:8080/kafka/resume
如果pauseListener()方法被多个线程同时访问可能会出现竞态条件或导致未定义的行为。
尽管在这个特定的代码片段中不容易直接判断线程安全性问题。
因此我在这里进行了完善确保多线程并发访问下的安全性以上写法太过繁琐可以参考如下代码方式。
ReentrantLock();Autowiredprivate
listenerName*/GetMapping(/start/{listenerName})public
(!isListenerRunning(listenerName))
listenerName);startListener(listenerName);}resumeListener(listenerName);log.info(resume
监听器的名称。
*/GetMapping(/pause/{listenerName})public
registry.getListenerContainer(listenerName);if
listenerName);return;}log.info(Attempting
listenerName);listenerContainer.pause();log.info(listenerName
{Objects.requireNonNull(registry.getListenerContainer(listenerName)).start();}
{Objects.requireNonNull(registry.getListenerContainer(listenerName)).resume();}
Objects.requireNonNull(registry.getListenerContainer(listenerName)).isRunning();}
最终我在测试完后结果是符合预期的先暂停然后持续发送消息观察有无收到消息然后在重新启动监听即可。
http://localhost:8080/consumer/pause/eventListener
生成Long类型的唯一IDMSG_ID$(uuidgen)MESSAGERandom
$(generate_random_number)JSON_MESSAGE{\id\:\$ID\,
kafka集成进行消费的使用过程对消费者的配置和以及对消费者的启动和暂停实践详细介绍了配置各个步骤。
不足之处未提及异常处理批量消费消费者动态扩缩提升消费能力的问题这些后面在陆续补充。
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback