96SEO 2026-02-23 12:21 3
对接的时候或者处在不同的网络环境下比如在互联网和政务外网的分布部署服务的时候我们需要对接多台kafka来达到我们的业务需求那么当kafka存在多数据源的情况就与单机的情况有所不同。

org.springframework.kafka:spring-kafka:2.8.2配置
如果是单机的kafka我们直接通过springboot自动配置的就可以使用例如在yml里面直接引用
spring:kafka:producer:key-serializer:
org.apache.kafka.common.serialization.StringSerializervalue-serializer:
org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer:
org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:
org.apache.kafka.common.serialization.StringDeserializerbootstrap-servers:
server001.bbd:9092在使用的时候直接注入,然后就可以使用里面的方法了
本篇文章主要讲的是在多数据源下的使用和单机的有所不同我也看了网上的一些博客但是当我去按照网上的配置的时候总是会报错
kafakTemplate这个bean找不到,所以没办法只有按照springboot自动配置里面的来改
org.springframework.beans.factory.ObjectProvider;
org.springframework.beans.factory.annotation.Qualifier;
org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
org.springframework.boot.autoconfigure.kafka.KafkaProperties;
org.springframework.boot.context.properties.EnableConfigurationProperties;
org.springframework.context.annotation.Bean;
org.springframework.context.annotation.Configuration;
org.springframework.context.annotation.Primary;
org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
org.springframework.kafka.config.KafkaListenerContainerFactory;
org.springframework.kafka.core.*;
org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
org.springframework.kafka.support.LoggingProducerListener;
org.springframework.kafka.support.ProducerListener;
org.springframework.kafka.support.converter.RecordMessageConverter;
org.springframework.kafka.transaction.KafkaTransactionManager;import
java.io.IOException;Configuration(proxyBeanMethods
ConditionalOnClass(KafkaTemplate.class)
EnableConfigurationProperties(KafkaProperties.class)
KafkaConfiguration(KafkaProperties
properties;this.kafkaSecondProperties
kafkaSecondProperties;}Bean(kafkaTemplate)Primarypublic
kafkaTemplate(ProducerFactoryObject,
kafkaProducerFactory,ProducerListenerObject,
kafkaProducerListener,ObjectProviderRecordMessageConverter
KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return
kafkaTemplate;}Bean(kafkaSecondTemplate)public
kafkaSecondTemplate(Qualifier(kafkaSecondProducerFactory)
kafkaProducerFactory,Qualifier(kafkaSecondProducerListener)
kafkaProducerListener,ObjectProviderRecordMessageConverter
KafkaTemplate(kafkaProducerFactory);messageConverter.ifUnique(kafkaTemplate::setMessageConverter);kafkaTemplate.setProducerListener(kafkaProducerListener);kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());return
kafkaTemplate;}Bean(kafkaProducerListener)Primarypublic
LoggingProducerListener();}Bean(kafkaSecondProducerListener)public
LoggingProducerListener();}Bean(kafkaConsumerFactory)Primarypublic
kafkaConsumerFactory(ObjectProviderDefaultKafkaConsumerFactoryCustomizer
{DefaultKafkaConsumerFactoryObject,
DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());customizers.orderedStream().forEach((customizer)
customizer.customize(factory));return
factory;}Bean(kafkaSecondConsumerFactory)public
kafkaSecondConsumerFactory(ObjectProviderDefaultKafkaConsumerFactoryCustomizer
{DefaultKafkaConsumerFactoryObject,
DefaultKafkaConsumerFactory(this.kafkaSecondProperties.buildConsumerProperties());customizers.orderedStream().forEach((customizer)
customizer.customize(factory));return
factory;}Bean(zwKafkaContainerFactory)KafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger,
zwKafkaContainerFactory(Qualifier(value
{ConcurrentKafkaListenerContainerFactoryInteger,
ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(kafkaSecondConsumerFactory);factory.setConcurrency(3);factory.getContainerProperties().setPollTimeout(3000);return
factory;}Bean(kafkaProducerFactory)Primarypublic
kafkaProducerFactory(ObjectProviderDefaultKafkaProducerFactoryCustomizer
{DefaultKafkaProducerFactoryObject,
DefaultKafkaProducerFactory(this.properties.buildProducerProperties());String
this.properties.getProducer().getTransactionIdPrefix();if
{factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer)
customizer.customize(factory));return
factory;}Bean(kafkaSecondProducerFactory)public
kafkaSecondProducerFactory(ObjectProviderDefaultKafkaProducerFactoryCustomizer
{DefaultKafkaProducerFactoryObject,
DefaultKafkaProducerFactory(this.kafkaSecondProperties.buildProducerProperties());String
this.kafkaSecondProperties.getProducer().getTransactionIdPrefix();if
{factory.setTransactionIdPrefix(transactionIdPrefix);}customizers.orderedStream().forEach((customizer)
customizer.customize(factory));return
factory;}BeanConditionalOnProperty(name
spring.kafka.producer.transaction-id-prefix)public
kafkaTransactionManager(ProducerFactory?,
KafkaTransactionManager(producerFactory);}BeanConditionalOnProperty(name
spring.kafka.jaas.enabled)public
KafkaJaasLoginModuleInitializer
{KafkaJaasLoginModuleInitializer
KafkaJaasLoginModuleInitializer();KafkaProperties.Jaas
(jaasProperties.getControlFlag()
{jaas.setControlFlag(jaasProperties.getControlFlag());}if
(jaasProperties.getLoginModule()
{jaas.setLoginModule(jaasProperties.getLoginModule());}jaas.setOptions(jaasProperties.getOptions());return
jaas;}Bean(kafkaAdmin)Primarypublic
KafkaAdmin(this.properties.buildAdminProperties());kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());return
com.alibaba.fastjson.JSONObject;import
com.ddb.zggz.config.ApplicationConfiguration;
org.springframework.beans.factory.annotation.Autowired;
org.springframework.beans.factory.annotation.Qualifier;
org.springframework.kafka.core.KafkaTemplate;
org.springframework.kafka.support.SendResult;
org.springframework.stereotype.Component;
org.springframework.util.concurrent.ListenableFuture;
org.springframework.util.concurrent.ListenableFutureCallback;import
javax.annotation.Resource;Component
kafkaSecondTemplate;Resourceprivate
{ListenableFutureSendResultString,
(zw.equals(configuration.getEnvironment())){sendResultListenableFuture
kafkaSecondTemplate.send(configuration.getPushTopic(),
(net.equals(configuration.getEnvironment())){sendResultListenableFuture
kafkaTemplate.send(configuration.getPushTopic(),
IllegalArgumentException(kakfa发送消息失败);}sendResultListenableFuture.addCallback(new
ListenableFutureCallbackSendResultString,
{log.error(kafka发送的message报错发送数据{},
{log.info(kafka发送的message成功发送数据{},
com.alibaba.fastjson.JSONObject;import
com.ddb.zggz.config.ApplicationConfiguration;
com.ddb.zggz.model.dto.ApprovalDTO;
com.ddb.zggz.param.OffShelfParam;
com.ddb.zggz.service.GzApprovalService;
com.ddb.zggz.service.GzServiceService;
org.apache.commons.lang3.StringUtils;
org.apache.kafka.clients.consumer.ConsumerRecord;
org.springframework.beans.factory.annotation.Autowired;
org.springframework.kafka.annotation.DltHandler;
org.springframework.kafka.annotation.KafkaListener;
org.springframework.kafka.annotation.RetryableTopic;
org.springframework.retry.annotation.Backoff;
org.springframework.stereotype.Component;
org.springframework.util.ObjectUtils;import
gzApprovalService;Autowiredprivate
gzServiceService;KafkaListener(topics
${application.config.push-topic},
zwKafkaContainerFactory)RetryableTopic(include
consumerRecord.value();PushParam
(version-approval.equals(pushParam.getEvent()))
JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()),
ApprovalDTO.class);gzApprovalService.approval(approvalDTO);}//服务下架if
(pushParam.getEvent().equals(server-OffShelf-gzt))
JSONObject.parseObject(JSONObject.toJSONString(pushParam.getData()),
OffShelfParam.class);gzServiceService.offShelfV1(offShelfParam.getReason(),
offShelfParam.getVersion());}}DltHandlerpublic
com.alibaba.fastjson.annotation.JSONField;
com.ddb.zggz.model.GzH5VersionManage;
com.ddb.zggz.model.dto.ApprovalDTO;
com.ddb.zggz.param.OffShelfParam;
com.ddb.zggz.param.PublishParam;
com.ddb.zggz.param.ReviewAndRollback;
com.fasterxml.jackson.annotation.JsonFormat;
com.fasterxml.jackson.databind.annotation.JsonDeserialize;
com.fasterxml.jackson.databind.annotation.JsonSerialize;
com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
LocalDateTimeSerializer.class)JsonDeserialize(using
LocalDateTimeDeserializer.class)JSONField(format
toKafkaVersion(GzH5VersionManage
PushParam();pushParam.setData(gzH5VersionManage);pushParam.setEvent(save-version);return
PushParam();pushParam.setData(gzService);pushParam.setEvent(save-server);return
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback