96SEO 2026-02-20 00:51 0
实时数仓一般需要一直运行无法使用常规的配置文件重启加载方式来修改需要读取的ODS层数据因此需要通过Flink-cdc实时监控MySql中的维度数据配置信息表实时动态的发布广播信息。

主流数据根据广播数据及时调整处理逻辑并自动在HBase中创建相应的维度表和写入相应的维度数据。
ods业务主题数据数据清洗是否为JSON格式使用flink-cdc读取监控配置表数据在HBase中创建维度表做成广播流连接主流和广播流筛选出需要写出的字段写出到Hbase
所有Flink程序的基类负责搭建Flink运行环境和设置并行度和检查点等相关参数。
其中我们的数据来源也确定为Kafka故数据源代码也写在这里。
每个Flink程序的具体处理逻辑由handle()函数来负责处理。
bean负责存放项目运行过程中需要用到的bean对象比如当前flink-cdc程序中需要用到的TableProcessDim类配置信息表对象。
constant负责存放程序中需要使用到常量参数function负责存放一些通用的函数方法util一般存放和数据连接相关的工具类test目录:
appDimApp里面写的是dim层的具体实现具体步骤如上述流程图所示。
function负责存放数据处理的实现类一般会继承相应的父类在dim层可以直接调用这里的子类来实现父类接口让dim层的代码逻辑更加清晰。
realtime-dwd模块如上realtime-dws模块如上
数据清洗简单来说就是对数据进行简单的转换筛选。
首先如果在转换过程中出现异常直接过滤掉。
注意这里无需抛出异常因为如果throw
exception会导致整个程序异常终止而在数据处理过程中出现部分数据格式错误而无法正常进行格式转换是很常见的只需将异常信息打印到控制台即可。
如果转换正常再判断是否满足以下三个条件
数据库名为gmall数据类型不是bootstrap-start或者bootstrap-completedata字段不是null且长度不为0
注意在构建Flink-cdc对应的MySQLSource时tableList参数必须是库表.表名结构
调用env.fromSource()方法将数据源的发送过来的数据转换Ds数据流在该方法中可以设置数据的水位线。
获取到数据后建议先打印到控制台查看数据的具体结构。
注意读取配置信息表的并发度必须设置为1如果不为1只能读取r操作数据其他更新数据无法读取。
MySqlSource.Stringbuilder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList(databaseName)
database.tableList(databaseName.tableName)
JsonDebeziumDeserializationSchema())
String.startupOptions(StartupOptions.initial()).build();return
数据库中的配置表数据经过Flink-cdc处理后发送到这里是json格式的字符串这里根据数据的四种类型op在HBase中进行不同的建表删表操作同时对数json字符数据进行转换映射处理转换为对应的bean对象数据流。
这里一个数据产生一个处理后的对象故使用Map算子或FlatMap算子都可以。
代表update需要先删除掉旧表然后根据新表的字段创建一个新表
创建HBase连接创建连接是很耗费资源的行为因此新建连接和关闭连接需要写在open和close方法中HBase中想要对表进行创建和删除等DDL操作都由Admin对象管理如果需要对数据进行插入删除等DML操作需要创建Table对象。
详细操作细节请看相应代码即可。
SingleOutputStreamOperatorTableProcessDim
createHbaseTable(DataStreamSourceString
{SingleOutputStreamOperatorTableProcessDim
HBaseUtil.getHBaseConnection();}Overridepublic
{//关闭连接HBaseUtil.closeHBaseConn(connection);}Overridepublic
out){//使用读取的配置表数据到HBase中创建与之对应的表格try
JSONObject.parseObject(s);String
jsonObject.getString(op);TableProcessDim
TableProcessDim.class);dim.setOp(op);//当配置表发送一个D类型的数据对应的HBase需要删除一张维度表deleteTable(dim);}
TableProcessDim.class);createTable(dim);dim.setOp(op);}
TableProcessDim.class);deleteTable(dim);createTable(dim);}dim.setOp(op);out.collect(dim);}
{HBaseUtil.createHBaseTable(connection,Constant.HBASE_NAMESPACE,dim.getSinkTable(),split);}
{HBaseUtil.dropHBaseTable(connection,
{e.printStackTrace();}}});return
从Flink-cdc获取的数据gmall2023_config是作为一个参数来控制我们对于主流即ODS层数据gmall数据库的业务数据的处理逻辑。
gmall2023)_config库中的Table_process_dim表决定了后续程序筛选哪个表作为维度信息并且定义了表中有哪些字段。
转换为广播流只需要调用上述得到的TableProcessDimStream的broadcast方法使用的主流(gmall业务数据)的connect方法得到一个连接流然后对连接流进行process处理。
创建BroadcastProcessFunction在里面分别有两个函数
processBroadcastElement():处理广播流数据processElement()处理主流数据
读取广播状态将配置表信息写到广播状态中根据广播状态数据的op对状态做相应的修改
查询广播状态判断当前数据对应的表是否存在于状态中如果数据比状态来的更早造成状态为空需要对状态做预处理提前从mysql中读取维表配置表信息如果根据当前表的表名查询的状态不为空说明该表为维度数据使用收集器收集起来。
在维度配置信息表中的sink_column字段里定义了维度表需要的字段使用filter算子对JsonObj里面的data字段进行过滤即可获取到想要的字段数据。
过滤后的数据流调用它的addSink方法方法中需要传入一个SinkFunction接口类。
该接口需要实现三个方法分别是
open方法获取HBase连接close方法关闭HBase连接invoke方法写入数据时调用的方法根据jsonObj中的type做不同处理如果是delete需要删除对应的维度表数据否则都是直接覆盖写入。
代码的Gitee仓库地址https://gitee.com/langpaian/gmall2023-realtime.git
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback