96SEO 2026-02-20 07:33 0
层剩余的需求上一节我们把日志数据根据不同类型分流写入到了不同的主题

既然是独立访客就必须对日志中的数据做去重独立访客数一般用来做日活指标因为我们的机器一般都是
小时全年无休的所以我们实时数仓也可以做这种日级别的指标需求通过状态来存储历史就可以实现而怎么判断访客是否重复这就又用到了
中的状态编程状态就是历史和上一节我们判断新老访客一样我们这里也可以给每个
keyby存储上一次访问的日期注意是日期只精确到天每来一条数据就判断它的
独立访客数据对应的页面必然是会话起始页面last_page_id
null所以对于跨天的访问不能计算在内昨天到今天访问了多个页面而今天页面的
获取执行环境StreamExecutionEnvironment
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
开启checkpointenv.enableCheckpointing(5
CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/s/ck);env.getCheckpointConfig().setCheckpointTimeout(10
60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
uvDetail;DataStreamSourceString
env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic,
两步都一起完成SingleOutputStreamOperatorJSONObject
JSONObject.parseObject(value);//
jsonObject.getJSONObject(page).getString(last_page_id);if
json.getJSONObject(common).getString(mid));//
使用富函数,因为富函数提供更多的信息如上下文等SingleOutputStreamOperatorJSONObject
ValueStateDescriptor(lastVisit,
getRuntimeContext().getState(stateDescriptor);}Overridepublic
更新状态lastVisitDate.update(curDate);return
dwd_traffic_unique_visitor_detail;uvDS.map(data
data.toJSONString()).addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));//
执行任务env.execute(DwdTrafficUniqueVisitorDetail);}}
上面我们的代码逻辑看起来已经没什么问题了但是我们可以设想假设一个用户,2024-01-01
天我们依然要一直存储它的状态而我们判断用户是否已经登录的逻辑是lastVisitDate
lastVisitDate今天所以我们完全可以在一天之后把该用户的
ValueStateDescriptor(lastVisit,
TTLstateDescriptor.enableTimeToLive(new
StateTtlConfig.Builder(Time.days(1))//
可更新,并且在创建或更新状态的时候更新.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());lastVisitDate
getRuntimeContext().getState(stateDescriptor);}
跳出的概念跳出指的是用户在一次会话中只访问了一个页面的情况注意粒度是会话我们在之前做离线数仓的时候做过跳出率的指标对于离线数仓我们可以在
层构建一张流量域近1日会话粒度页面浏览表dws_traffic_session_page_view_1d通过下面的
CAST(SUM(IF(page_count1,1,0))/COUNT(*))
dws_traffic_session_page_view_1d
在这里的实时数仓中我们不可能等到一天结束最后才去计算跳出率但是我们这里又没有
如果我短时间10s内发生多个跳出但是正好这些跳出都在一个会话这会导致窗口结束时误以为这不是跳出毕竟窗口内有多条数据②
可能我的一次正常的会话被会话窗口切分到两个不同的会话窗口结果把一个非跳出访问计算为
会结束所以我们可以设置一个定时器定时器时间范围内的数据如果没数据来就视作一个会话结束触发计算并结合状态编程把新会话的首页存入状态
null则该页面是新的会话起始页开启定时器并将数据自身写入状态如果状态不为
null说明刚跳出一次并且在定时器时间范围内又进来一次这种情况需要将第一条数据跳出的数据也就是写入状态中的数据输出然后将自身写入状态定时器依然存在等时间到了触发计算如果
期望所有匹配的事件严格的一个接一个出现中间没有任何不匹配的事件。
对应方法为
忽略匹配的事件之间的不匹配的事件。
对应方法为followedBy()不确定的松散连续:
更进一步的松散连续允许忽略掉一些匹配事件的附加匹配。
对应方法为followedByAny()。
这里需要注意因为我们后面要保证数据有序所以我们最好指定事件时间的提取字段并添加水位线设置合理的超时时间理论上可以保证数据绝对有序
获取执行环境StreamExecutionEnvironment
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);
开启checkpointenv.enableCheckpointing(5
CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/s/ck);env.getCheckpointConfig().setCheckpointTimeout(10
60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L));
user_jump_detail;DataStreamSourceString
env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic,
JSONSingleOutputStreamOperatorJSONObject
pageDS.map(JSON::parseObject);//
jsonDS.assignTimestampsAndWatermarks(WatermarkStrategy.JSONObjectforBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new
SerializableTimestampAssignerJSONObject()
element.getLong(ts);}})).keyBy(json
json.getJSONObject(common).getString(mid));接下来是核心的定义
Pattern.JSONObjectbegin(start).where(new
value.getJSONObject(page).getString(last_page_id)
value.getJSONObject(page).getString(last_page_id)
null;}}).within(Time.seconds(10L));//
Pattern.JSONObjectbegin(start).where(new
value.getJSONObject(page).getString(last_page_id)
next.within(Time.seconds(10L));//
建模式序列作用到流上PatternStreamJSONObject
OutputTag(timeout);SingleOutputStreamOperatorString
patternStream.select(timeoutTag,//
PatternTimeoutFunctionJSONObject,
对于超时数据来说,当前的数据第一个规则匹配上了,第二个没有匹配上导致超时,那么我们要提取的就是当前数据(第一个数据,第二个数据没来)//
数据类型,因为考虑到我们可能使用的是循环模式(只有一个key)Overridepublic
map.get(start).get(0).toJSONString();}},
PatternSelectFunctionJSONObject,
匹配上的数据,我们只要第一个数据,因为只能证明第一个数据是跳出数据Overridepublic
map.get(start).get(0).toJSONString();}});DataStreamString
selectDS.getSideOutput(timeoutTag);//
dwd_traffic_user_jump_detail;unionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(targetTopic));//
启动任务env.execute(DwdTrafficUserJumpDetail);}
超时时间内规则一被满足未等到第二条数据则会被判定为超时数据。
所以我们只要把超时数据和
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback