96SEO 2026-02-19 20:57 14
。

本节我们将介绍最后一种函数#xff1a;UDTAF——用户自定义表值聚合函数。
UDTAFTableAggregateFunction的实现累加器定义创建累加
在前面几篇文章中我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。
本节我们将介绍最后一种函数UDTAF——用户自定义表值聚合函数。
UDTAF函数即具备了UDTF的特点也具备UDAF的特点。
即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据多组计算出一个值。
将一行中的“英语成绩”和“数学成绩”拆成“成绩”和“科目”相当于把一行数据拆解成多行如上图左侧“张三”只有一行而右侧有两行“张三”信息。
这种拆解操作就需要T类型的用户自定义函数比如UDTF和UDTAF。
而我们需要计算一个年级一科的平均成绩比如1年级英语的平均成绩则需要按年级聚合之后再做计算。
这个就需要A类型的用户自定义函数比如UDAF和UDTAF。
同时要满足上述两种技术方案的就是UDTAF。
我们先看下主体代码它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。
但是有两个重要区别要设置成in_streaming_mode模式否则会报错udtaf要修饰一个对象而非一个方法
fileconfig.set_string(parallelism.default,
TableEnvironment.create(env_settings)row_type_tab_source
DataTypes.ROW([DataTypes.FIELD(name,
DataTypes.STRING())])students_score
t_env.from_elements(students_score,
row_type_tab_source)split_class
udtaf(SplitClass())tab_source.group_by(col(grade))
\.execute().print()TableAggregateFunction的实现
用于计算的类要继承于TableAggregateFunction即UDTAF中的TAF。
SplitClass(TableAggregateFunction):_class_keys
math]我们需要通过get_result_type告诉框架UDTAF函数返回的是什么类型的数据。
一般我们都是构造一个行类型——ROW然后定义其每个字段的值和类型
namestring类型用户姓名scorefloat类型考分avg
scorefloat类型科目年级平均分数classsting类型科目名称
accumulator累加器是用于参与计算的中间数据。
比如这个案例中我们会向让accumulator保存拆解后的数据即一行拆解成多行后的数据然后再计算各年级每科的平均成绩。
get_accumulator_type(self):return
DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD(name,
namestring类型姓名scorefloat类型分数classstring类型科目名称
create_accumulator(self):return
self._class_keys:accumulator.append(Row(row[name],
DataTypes.ROW([DataTypes.FIELD(name,
DataTypes.STRING())])可以看到result_type返回类型和accumulator_type累加器类型是不一样的也可以一样主要看怎么计算规则。
前者比后者多了“学科年级平均分”avg
这些字段和我们目标字段只差一个grade年级。
因为原始表中有grade且我们会通过grade聚类所以最终我们可以获得这个信息而不用在这儿定义。
需要注意的是虽然表值类型函数返回的是一组数据若干Row但是这儿只是返回Row的具体定义而不是ARRAY[Row]。
x这个函数会在最后执行它会通过累加器中的数据计算“学科年级平均分”然后构造和“返回类型”一直的Row到rows数组中。
最后通过yeild关键字返回一个生成器我们可以将其看成还是一组Row即拆解后的结果。
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
udf,udtf,udaf,udtaf,TableAggregateFunction
SplitClass(TableAggregateFunction):_class_keys
create_accumulator(self):return
self._class_keys:accumulator.append(Row(row[name],
get_accumulator_type(self):return
DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD(name,
DataTypes.ROW([DataTypes.FIELD(name,
fileconfig.set_string(parallelism.default,
TableEnvironment.create(env_settings)row_type_tab_source
DataTypes.ROW([DataTypes.FIELD(name,
DataTypes.STRING())])students_score
t_env.from_elements(students_score,
row_type_tab_source)split_class
udtaf(SplitClass())tab_source.group_by(col(grade))
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback