96SEO 2026-02-20 08:48 0
水位线概述水印本质生成WatermarkWatermark策略WatermarkStrategy工具类使用Watermark策略

内置Watermark生成器单调递增时间戳分配器固定延迟的时间戳分配器
自定义WatermarkGenerator周期性Watermark生成器标记Watermark生成器Watermark策略与Kafka连接器
Flink中Watermark水印是一种用于处理事件时间eventtime的时间指示器。
它模拟了事件流中事件时间进展的概念。
事件时间是指事件实际发生的时间在分布式流处理中经常用于处理无序事件流。
然而由于网络延迟、乱序事件的到达以及分布式处理的特点事件时间可能不按顺序到达处理器。
在这种情况下处理程序需要一种机制来标识它们已经处理过的事件时间并据此生成或更新水印。
水印是一个特殊的事件包含了一个时间戳。
它表示截至到该时间戳的事件已经全部到达或预期已到达并且可以被认为是完整的。
水印告知系统在事件时间维度上处理事件的进展情况并在触发窗口计算、事件乱序处理等方面提供辅助。
水印的生成通常基于事件数据中的时间戳通过一些策略来推断出未到达的事件的时间戳。
简单的策略可以是事件时间减去一个固定的延迟值例如如果我们有一个事件的时间戳我们可以生成一个比该事件时间戳小一定固定时间的水印。
Flink通过处理数据流中的时间戳和水印来衡量事件时间进展并通过水印来驱动事件时间的处理。
可以根据应用程序的需要自定义水印生成的策略。
Watermark是水印、水位线的意思水印的出现是为了解决实时计算中的数据乱序问题它的本质是DataStream中一个带有时间戳的元素。
水位线可以看作一条特殊的数据记录它是插入到数据流中的一个标记点主要内容就是一个时间戳用来指示当前的事件时间。
通过使用水位线机制Flink能够动态地处理乱序事件并在保证准确性的同时提供低延迟的数据处理。
如果Flink系统中出现了一个WaterMarkT那么就意味着EventTimeT的数据都已经到达窗口的结束时间和T相同的那个窗口被触发进行计算了。
因此水印是Flink判断迟到数据的标准同时也是窗口触发的标记。
在程序并行度大于1的情况下会有多个流产生水印和窗口这时候Flink会选取时间戳最小的水印。
生成水位线使用assignTimestampsAndWatermarks()方法它主要用来为流中的数据分配时间戳并生成水位线来指示事件时间。
dataStream.assignTimestampsAndWatermarks(WatermarkStrategyT
watermarkStrategy);需要传入一个WatermarkStrategy作为参数也就是所谓的水位线生成策略
Flink程序需要知道事件时间戳对应的字段意味着数据流中的每个元素都需要拥有可分配的事件时间戳。
通过使用TimestampAssigner
的生成是齐头并进的其可以告诉Flink应用程序事件时间的进度。
其可以通过指定WatermarkGenerator
和WatermarkGenerator的WatermarkStrateg
WatermarkStrategy是一个接口该接口中包含了一个时间戳分配器TimestampAssigner和一个水位线生成器WatermarkGenerator。
主要负责按照既定的方式基于时间戳生成水位线*/WatermarkGeneratorT
createWatermarkGenerator(WatermarkGeneratorSupplier.Context
createTimestampAssigner(TimestampAssignerSupplier.Context
工具类WatermarkStrategy中也提供了几个常用的watermark策略并且可以在某些必要场景下构建自己的
为时间戳单调递增的情况创建水印策略,适用于有序流*/static
AscendingTimestampsWatermarks();}/***
为记录无序流的情况创建水印策略但可以设置事件无序程度的上限。
*/static
forBoundedOutOfOrderness(Duration
BoundedOutOfOrdernessWatermarks(maxOutOfOrderness);}/***
基于watermarkgeneratorsupper自定义创建水印策略
forGenerator(WatermarkGeneratorSupplier
generatorSupplier::createWatermarkGenerator;}/***
创建完全不生成水印的水印策略。
这在进行纯基于处理时间的流处理的场景中可能是有用*/static
NoWatermarksGenerator();}使用forBoundedOutOfOrderness
watermark生成器和一个lambda表达式作为时间戳分配器
4));SingleOutputStreamOperatorTuple2String,
dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.Tuple2String,
IntegerforMonotonousTimestamps().withTimestampAssigner((value,
1.直接在数据源上使用2.直接在非数据源的操作之后使用StreamExecutionEnvironment
StreamExecutionEnvironment.getExecutionEnvironment();DataStreamMyEvent
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,FilePathFilter.createDefaultFilter(),
).assignTimestampsAndWatermarks(watermark
strategy);withTimestampsAndWatermarks.keyBy(
).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce(
WatermarkStrategy.forMonotonousTimestamps();2.forBoundedOutOfOrderness:
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));单调递增时间戳分配器
对于有序流主要特点就是时间戳单调增长永远不会出现迟到数据的问题。
因此当前时间戳就可以充当
watermark因为后续到达数据的时间戳不会比当前的小。
这是周期性生成水位线的最简单的场景直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//
从socket接收数据流SingleOutputStreamOperatorString
将输入数据转换为IntegerDataStreamInteger
定义Watermark策略WatermarkStrategyInteger
就触发窗口执行.IntegerforMonotonousTimestamps()//
TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数//
指定时间戳分配器从数据中提取.withTimestampAssigner(new
SerializableTimestampAssignerInteger()
将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据
指定watermark策略SingleOutputStreamOperatorInteger
dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator//
事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new
ProcessAllWindowFunctionInteger,
context.window().getStart();long
context.window().getEnd();String
DateFormatUtils.format(startTs,
input.spliterator().estimateSize();out.collect(窗口在时间区间
input.toString());}}).print();env.execute();}nc
乱序流中需要等待迟到数据到齐必须设置一个固定量的延迟时间数据流中的数据可能遇到的最大延迟。
此时生成水位线的时间戳就是当前数据流中最大的时间戳减去延迟时间的结果。
调用WatermarkStrategy.forBoundedOutOfOrderness()方法可以实现方法传入一个maxOutOfOrderness参数表示最大乱序程度它表示数据流中乱序数据时间戳的最大差值如果能确定乱序程度那么设置对应时间长度的延迟就可以等到所有的乱序数据
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//
从socket接收数据流SingleOutputStreamOperatorString
将输入数据转换为IntegerDataStreamInteger
定义Watermark策略WatermarkStrategyInteger
就触发窗口执行.IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3))//
从数据中提取.withTimestampAssigner((element,
将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据
watermark策略SingleOutputStreamOperatorInteger
dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator//
使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new
ProcessAllWindowFunctionInteger,
context.window().getStart();long
context.window().getEnd();String
DateFormatUtils.format(startTs,
input.spliterator().estimateSize();out.collect(窗口在时间区间
input.toString());}}).print();env.execute();}nc
TimestampAssigner是一个可以从事件数据中提取时间戳字段的简单函数
AssignerWithPunctuatedWatermarks}
AssignerWithPeriodicWatermarks}
每来一条事件数据调用一次可以检查或者记录事件的时间戳或者也可以基于事件数据本身去生成
ExecutionConfig#getAutoWatermarkInterval()}
观察传入的事件数据然后在框架调用onPeriodicEmit()时发出watermark
毫秒可以通过ExecutionConfig.setAutoWatermarkInterval(…)
指定。
每次都会调用生成器的onPeriodicEmit()方法如果返回的watermark非空且值大于前一个watermark则将发出新的watermark
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//
从socket接收数据流SingleOutputStreamOperatorString
将输入数据转换为IntegerDataStreamInteger
修改默认周期时间为1000msenv.getConfig().setAutoWatermarkInterval(1000);WatermarkStrategyInteger
3000L:延迟时间.IntegerforGenerator(ctx
MyWatermarkGenerator(3000L)).withTimestampAssigner((element,
将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据
1000L;});SingleOutputStreamOperatorInteger
dataStream.assignTimestampsAndWatermarks(watermarkStrategy);singleOutputStreamOperator//
使用事件时间语义窗口.windowAll(TumblingEventTimeWindows.of(Time.seconds(10))).process(new
ProcessAllWindowFunctionInteger,
context.window().getStart();long
context.window().getEnd();String
DateFormatUtils.format(startTs,
input.spliterator().estimateSize();out.collect(窗口在时间区间
input.toString());}}).print();env.execute();}/***
为每个事件调用允许水印生成器检查和记住事件时间戳或根据事件本身发出水印。
**
eventTimestamp);System.out.println(调用onEvent
调用此方法和生成水印的时间间隔取决于ExecutionConfig.getAutoWatermarkInterval()**
1));System.out.println(调用onPeriodicEmit
Watermark(System.currentTimeMillis()
标记watermark生成器会不停地检测onEvent()中的事件当发现带有水位线信息的事件时就立即发出水位线。
把发送水位线的逻辑写在onEvent方法当中即可
标记生成器将查看onEvent()中的事件数据并等待检查在流中携带watermark的特殊标记事件或打点数据。
当获取到这些事件数据时它将立即发出watermark。
通常情况下标记生成器不会通过onPeriodicEmit()发出
自定义间歇性生成器.IntegerforGenerator(ctx
MyWatermarkGenerator(3000L)).withTimestampAssigner((element,
将输入数字转时间戳单位毫秒当作数据的时间戳System.out.println(数据
eventTimestamp);output.emitWatermark(new
1));System.out.println(调用onEvent
连接器作为数据源时每个Kafka分区可能有一个简单的事件时间模式递增的时间戳或有界无序当使用Kafka数据源时多个分区常常并行使用因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式在这种情况下可以使用Flink中可识别Kafka分区的watermark生成机制。
使用此特性将在Kafka消费端内部针对每个Kafka分区生成watermark并且不同分区watermark的合并方式与在数据流shuffle时的合并方式相同。
注意
在自定义数据源中发送水位线以后就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线。
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);KafkaSourceString
指定kafka节点的地址和端口.setBootstrapServers(node01:9092,node02:9092,node03:9092)//
指定消费者组的id.setGroupId(flink_group)//
指定反序列化器反序列化value.setValueOnlyDeserializer(new
flink消费kafka的策略.setStartingOffsets(OffsetsInitializer.latest()).build();DataStreamSourceString
WatermarkStrategy.noWatermarks(),
kafka_source);DataStreamSinkString
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)),
kafka_source).print(Kafka);stream.print(Kafka);env.execute();}
如果数据源中的某一个分区/分片在一段时间内未发送事件数据则意味着WatermarkGenerator也不会获得任何新数据去生成watermark。
我们称这类数据源为空闲输入或空闲源。
在这种情况下当某些其他分区仍然发送事件数据的时候就会出现问题。
由于下游算子watermark的计算方式是取所有不同的上游并行数据源watermark的最小值则其watermark将不会发生变化。
为了解决这个问题可以使用WatermarkStrategy来检测空闲输入并将其标记为空闲状态。
WatermarkStrategy为此提供了一个工具接口
StringforBoundedOutOfOrderness(Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1));并行度下的水位线传递
在多并行度下当一个任务接收到多个上游并行任务传递来的水位线时应该以最小的那个作为当前任务的事件时钟。
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);//
从socket接收数据流SingleOutputStreamOperatorString
将输入数据转换为IntegerDataStreamInteger
将数据合理地分发到不同的分区中DataStreamInteger
定义Watermark策略WatermarkStrategyInteger
就触发窗口执行.IntegerforMonotonousTimestamps()//
将输入数字转时间戳单位毫秒当作数据的时间戳.withTimestampAssigner((r,
watermark策略SingleOutputStreamOperatorInteger
partitionCustom.assignTimestampsAndWatermarks(watermarkStrategy);//
奇数一组偶数一组SingleOutputStreamOperatorString
singleOutputStreamOperator.keyBy(a
使用事件时间语义窗.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new
context.window().getStart();long
context.window().getEnd();String
DateFormatUtils.format(startTs,
input.spliterator().estimateSize();out.collect(分组
input.toString());}});process.print();env.execute();}public
偶数窗口中没有任何数据由于当前Task是以最小的那个作为当前任务的事件时钟就会导致当前Task的水位线无法推进就导致窗口无法触发。
定义Watermark策略WatermarkStrategyInteger
就触发窗口执行.IntegerforMonotonousTimestamps()//
从数据中提取.withTimestampAssigner((r,
1000L)//空闲等待5s.withIdleness(Duration.ofSeconds(5));2
设置窗口推迟关窗时间在关窗之前迟到数据来了还能被窗口计算来一条迟到数据触发一次计算。
关窗后迟到数据不会被计算放入侧输出流
在设置一定的窗口允许迟到时间时只考虑大部分的迟到数据忽略不考虑极端小部分迟到很久的数据
在水印产生时设置一个乱序容忍度推迟系统时间的推进保证窗口计算被延迟执行为乱序的数据争取更多的时间进入窗口。
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10));2.设置窗口延迟关闭
Flink的窗口也允许迟到数据。
当触发了窗口计算后会先计算当前的结果但是此时并不会关闭窗口。
以后每来一条迟到数据就触发一次这条数据所在窗口计算(增量计算)。
直到wartermark
.window(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3))3.使用侧流接收迟到的数据
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(3)).sideOutputLateData(lateWS)实现示例
StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//
从socket接收数据流SingleOutputStreamOperatorString
将输入数据转换为IntegerDataStreamInteger
定义Watermark策略WatermarkStrategyInteger
WatermarkStrategy.IntegerforBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner((element,
watermark策略SingleOutputStreamOperatorInteger
dataStream.assignTimestampsAndWatermarks(watermarkStrategy);OutputTagInteger
Types.POJO(Integer.class));SingleOutputStreamOperatorString
sensorDSwithWatermark.keyBy(sensor
2).window(TumblingEventTimeWindows.of(Time.seconds(10))).allowedLateness(Time.seconds(2))
推迟2s关窗.sideOutputLateData(lateTag)
context.window().getStart();long
context.window().getEnd();String
DateFormatUtils.format(startTs,
input.spliterator().estimateSize();out.collect(分组
input.toString());}});process.print();//
从主流获取侧输出流打印process.getSideOutput(lateTag).printToErr(关窗后的迟到数据);env.execute();}
作为专业的SEO优化服务提供商,我们致力于通过科学、系统的搜索引擎优化策略,帮助企业在百度、Google等搜索引擎中获得更高的排名和流量。我们的服务涵盖网站结构优化、内容优化、技术SEO和链接建设等多个维度。
| 服务项目 | 基础套餐 | 标准套餐 | 高级定制 |
|---|---|---|---|
| 关键词优化数量 | 10-20个核心词 | 30-50个核心词+长尾词 | 80-150个全方位覆盖 |
| 内容优化 | 基础页面优化 | 全站内容优化+每月5篇原创 | 个性化内容策略+每月15篇原创 |
| 技术SEO | 基本技术检查 | 全面技术优化+移动适配 | 深度技术重构+性能优化 |
| 外链建设 | 每月5-10条 | 每月20-30条高质量外链 | 每月50+条多渠道外链 |
| 数据报告 | 月度基础报告 | 双周详细报告+分析 | 每周深度报告+策略调整 |
| 效果保障 | 3-6个月见效 | 2-4个月见效 | 1-3个月快速见效 |
我们的SEO优化服务遵循科学严谨的流程,确保每一步都基于数据分析和行业最佳实践:
全面检测网站技术问题、内容质量、竞争对手情况,制定个性化优化方案。
基于用户搜索意图和商业目标,制定全面的关键词矩阵和布局策略。
解决网站技术问题,优化网站结构,提升页面速度和移动端体验。
创作高质量原创内容,优化现有页面,建立内容更新机制。
获取高质量外部链接,建立品牌在线影响力,提升网站权威度。
持续监控排名、流量和转化数据,根据效果调整优化策略。
基于我们服务的客户数据统计,平均优化效果如下:
我们坚信,真正的SEO优化不仅仅是追求排名,而是通过提供优质内容、优化用户体验、建立网站权威,最终实现可持续的业务增长。我们的目标是与客户建立长期合作关系,共同成长。
Demand feedback