96SEO 2026-02-20 10:23 12
数技术组件都是运行在JVM上的Flink也是运行在JVM上基于JVM的数据分析引擎都需要将大量的数据存储在内存中这就不得不面临JVM的一些问题比如Java对象存储密度较低等。

针对这些问题最常用的方法就是实现一个显式的内存管理也就是说用自定义的内存池来进行内存的分配回收接着将序列化后的对象存储到内存块中。
Avro等等。
但是Flink依然是选择了自己定制的序列化框架那么到底有什么意义呢若Flink选择自己定制的序列化框架对类型信息了解越多可以在早期完成类型检查更好的选取序列化方式进行数据布局节省数据的存储空间直接操作二进制数据。
Flink在其内部构建了一套自己的类型系统Flink现阶段支持的类型分类如图所示从图中可以看到Flink类型可以分为基础类型Basic、数组Arrays、复合类型Composite、辅助类型Auxiliary、泛型和其它类型Generic。
Flink支持任意的Java或是Scala类型。
不需要像Hadoop一样去实现一个特定的接口org.apache.hadoop.io.WritableFlink能够自动识别数据类型。
TypeInformation的思维导图如图所示从图中可以看出在Flink中每一个具体的类型都对应了一个具体的TypeInformation实现类例如BasicTypeInformation中的IntegerTypeInformation和FractionalTypeInformation都具体的对应了一个TypeInformation。
然后还有BasicArrayTypeInformation、CompositeType以及一些其它类型也都具体对应了一个TypeInformation。
TypeInformation是Flink类型系统的核心类。
对于用户自定义的Function来说Flink需要一个类型信息来作为该函数的输入输出类型即TypeInfomation。
该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer并用于执行语义检查比如当一些字段在作为join或grouping的键时检查这些字段是否在该类型中存在。
在Flink序列化过程中进行序列化操作必须要有序列化器那么序列化器从何而来
每一个具体的数据类型都对应一个TypeInformation的具体实现每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。
通过
Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer。
Flink可以自动生成对应的序列化器能非常高效地对数据集进行序列化和反序列化比如BasicTypeInfo、WritableTypeInfo等但针对GenericTypeInfo类型Flink会使用Kyro进行序列化和反序列化。
其中Tuple、Pojo和CaseClass类型是复合类型它们可能嵌套一个或者多个数据类型。
在这种情况下它们的序列化器同样是复合的。
它们会将内嵌类型的序列化委托给对应类型的序列化器。
简单的介绍下Pojo的类型规则即在满足一些条件的情况下才会选用Pojo的序列化进行相应的序列化与反序列化的一个操作。
即类必须是Public的且类有一个public的无参数构造函数该类以及所有超类中的所有非静态no-static、非瞬态no-transient字段都是public的和非最终的final或者具有公共getter和setter方法该方法遵循getter和setter的Java
bean命名约定。
当用户定义的数据类型无法识别为POJO类型时必须将其作为GenericType处理并使用Kryo进行序列化。
Flink自带了很多TypeSerializer子类大多数情况下各种自定义类型都是常用类型的排列组合因而可以直接复用如果内建的数据类型和序列化方式不能满足你的需求Flink的类型信息系统也支持用户拓展。
若用户有一些特殊的需求只需要实现
TypeInformation、TypeSerializer和TypeComparator即可定制自己类型的序列化和比较大小方式来提升数据类型在序列化和比较时的性能。
序列化就是将数据结构或者对象转换成一个二进制串的过程在Java里面可以简单地理解成一个byte数组。
而反序列化恰恰相反就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。
下面就以内嵌型的Tuple3这个对象为例简述一下它的序列化过程。
Tuple3包含三个层面一是int类型一是double类型还有一个是Person。
Person包含两个字段一是int型的ID另一个是
类型的name它在序列化操作时会委托相应具体序列化的序列化器进行相应的序列化操作。
从图中可以看到Tuple3
int类型通过IntSerializer进行序列化操作此时int只需要占用四个字节就可以了。
根据int占用四个字节这个能够体现出Flink可序列化过程中的一个优势即在知道数据类型的前提下可以更好的进行相应的序列化与反序列化操作。
相反如果采用Java的序列化虽然能够存储更多的属性信息但一次占据的存储空间会受到一定的损耗。
Person类会被当成一个Pojo对象来进行处理PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。
同样其字段则采取相对应的序列化器进行相应序列化在序列化完的结果中可以看到所有的数据都是由MemorySegment去支持。
MemorySegment在Flink中会将对象序列化到预分配的内存块上它代表1个固定长度的内存默认大小为32
kb。
MemorySegment代表Flink中的一个最小的内存分配单元相当于是Java的一个byte数组。
每条记录都会以序列化的形式存储在一个或多个MemorySegment中。
Flink常见的应用场景有四种即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation具体如下
如果函数签名只描述了超类型但是它们实际上在执行期间使用了超类型的子类型那么让Flink了解这些子类型会大大提高性能。
可以在StreamExecutionEnvironment或ExecutionEnvironment中调用.registertype
对于不适用于自己的序列化框架的数据类型Flink会使用Kryo来进行序列化并不是所有的类型都与Kryo无缝连接具体注册方法在下文介绍。
有时当Flink用尽各种手段都无法推测出泛型信息时用户需要传入一个类型提示TypeHint这个通常只在Java
在某些API调用中这可能是必需的因为Java的泛型类型擦除导致Flink无法推断数据类型。
其实在大多数情况下用户不必担心序列化框架和注册类型因为Flink已经提供了大量的序列化操作不需要去定义自己的一些序列化器但是在一些特殊场景下需要去做一些相应的处理。
类型声明去创建一个类型信息的对象是通过哪种方式通常是用TypeInformation.of()方法来创建一个类型信息的对象具体说明如下
TypeInformation.of(Person.class);【2】对于泛型类需要通过TypeHint来保存泛型类型信息
TypeInfomationTuple2Integer,Integer
TypeHintTuple2Integer,Integer(){});【3】预定义常量
如BasicTypeInfo这个类定义了一系列常用类型的快捷方式对于String、Boolean、Byte、Short、Integer、Long、Float、Double、Char等基本类型的类型声明可以直接使用。
而且Flink还提供了完全等价的Types类org.apache.flink.api.common.typeinfo.Types。
特别需要注意的是flink-table模块也有一个Types类org.apache.flink.table.api.Types用于table模块内部的类型定义信息用法稍有不同。
使用IDE
通过自定义TypeInfo为任意类提供Flink原生内存管理而非Kryo使存
储更紧凑运行时也更高效。
需要注意在自定义类上使用TypeInfo注解随后创建相应的TypeInfoFactory并覆盖createTypeInfo()方法。
TypeInfo(MyTupleTypeInfoFactory.class)
TypeInfoFactoryMyTuple{Overridepublic
MyTupleTypeInfo(genericParameters.get(T0).genericParameters.get(T1));}
Flink认识父类但不一定认识子类的一些独特特性因此需要单独注册子类型。
StreamExecutionEnvironment和
ExecutionEnvironment提供registerType()方法用来向Flink注册子类信息。
ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);在registerType()方法内部会使用TypeExtractor来提取类型信息如上所示获取到的类型信息属于PojoTypeInfo及其子类那么需要将其注册到一起否则统一交给Kryo去处理Flink并不过问
对于Flink无法序列化的类型例如用户自定义类型没有registerType也没有自定义TypeInfo和TypeInfoFactory默认会交给
Kryo处理如果Kryo仍然无法处理例如Guava、Thrift、Protobuf等第三方库的一些类有两种解决方案
env.getConfig().enableForceAvro();【2】为Kryo增加自定义的Serializer以增强Kryo的功能
env.getConfig().addDefaultKryoSerializer(clazz,
serializer);注如果希望完全禁用Kryo100%使用Flink的序列化机制可以通过Kryoenv.getConfig().disableGenericTypes()的方式完成但注意一切无法处理的类都将导致异常这种对于调试非常有效。
那么就需要将数据序列化之后写入NetworkBufferPool然后下层的Task读出之后再进行反序列化操作最后进行逻辑处理。
为了使得记录以及事件能够被写入
Flink提供了数据记录序列化器RecordSerializer与反序列化器RecordDeserializer以及事件序列化器EventSerializer。
Function发送的数据被封装成SerializationDelegate它将任意元素公开为IOReadableWritable以进行序列化通过setInstance()来传入要序列化的数据。
在Flink通信层的序列化中有几个问题值得关注具体如下
在构建StreamTransformation的时候通过TypeExtractor工具确定Function的输入输出类型。
TypeExtractor类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
通过TypeInfomation的createSerializer()方法获取对应类型的序列化器TypeSerializer并在addOperator()的过程中执行setSerializers()
操作设置StreamConfig的TYPESERIALIZERIN1、TYPESERIALIZERIN2、
通过TypeInfomation的createSerializer()方法获取对应类型的序列化器TypeSerializer并在addOperator()的过程中执行setSerializers()操作设置StreamConfig的TYPESERIALIZERIN1
大家都应该清楚Task和StreamTask两个概念Task是直接受TaskManager管理和调度的而Task又会调用StreamTask而StreamTask中真正封装了算子的处理逻辑。
在run()方法中首先将反序列化后的数据封装成StreamRecord交给算子处理然后将处理结果通过Collector发送给下游
在构建Collector时已经确定了SerializtionDelegate)并通过RecordWriter写入器将序列化后的结果写入DataOutput最后序列化的操作交给SerializerDelegate处理实际还是通过TypeSerializer的serialize()方法完成。
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback