10.Flink的窗口和时间类型

窗口运算

流式计算是一种被设计用于处理无限数据集的数据计算引擎,所谓无限数据集是指一种源源不断的数据流抽象成的集合。而Window就是一种将无限数据集切分成多个有限数据集并对每一个有限数据集分别进行处理的手段。Window本质上是将数据流按照一定的规则,逻辑地切分成很多个有限大小的“bucket”桶,这样就可以对每一个在“桶里面”的有限的数据依次地进行计算了。

流式计算引擎的特点是每输入一条数据就立即处理,延迟低。然而在一些场景下偏偏希望将数据先攒成一个个小批次,然后对每一个小批次再进行运算。例如用FlinkDataStream API将数据进行实时的聚合后,再将结果实时的写入到数据库,每输入一条数据都会输出一个聚合后的结果,如果数据量非常大,那么对外部的数据库的写入压力就比较大。而划分成Window,即将每一个Window中的有限的数据先聚合成一条或几条,再写入到数据库中,虽然延迟变高了,但是对数据库的写入压力变小了。Window操作可以认为是微批次准实时的计算,这样Flink DataStream API 既可以实现高效的实时运算,又可以实现微批次的准实时运算,让Flink在实时计算领域更加强大和灵活。

1.1 时间类型

Flink实时计算划分窗口时,如果使用时间作为划分窗口的依据,时间有不同的类型,分为Event Time、Ingestion Time、Processing Time。Flink默认使用的是Processing Time,程序运行如果使用不同的时间类型,计算的结果完全不同,可以根据实际需求选择使用具体哪一种时间类型。

0.Flink的窗口和时间类型"

1.1.1 Event Time 事件时间

在大数据领域,日志服务器生成的一条数据也可以称为一个事件。Event Time是指在数据产生时该设备上对应的时间,这个时间在进入Flink之前已经存在于数据记录中了。以后数据被Flink处理数据,如果使用Event Time作为时间标准,那么数据并不是按照Event Time的先后顺序被处理的,由于数据可能产生在多个不同的日志服务器,然后通常是再将数据写入到分布性消息中间件,然后被被Flink拉取进行处理时,处理的实际时间相对于数据产生的实际肯定有一定的延迟,并且Event Time可能也是乱序的。那么为什么还要使用Event Time呢?是因为使用Event Time时,Flink程序可以处理乱序事件和延迟数据。并且最重要的功能就是可以统计在数据产生时,对应时间的数据指标。

1.1.2 Ingestion Time 进入时间

Ingestion Time指的是事件数据进入到Flink的时间。每条数据的Ingestion Time就是进入到Source Operator时所在机器的系统时间。比如Flink从Kafka消息中间件消费数据,每一条数据的Ingestion Time就是FlinkKafkaConsumer拉取数据进入到TaskManager对应的时间。Ingestion Time介于Event Time和Processing Time之间,与 Event Time 相比,Ingestion Time程序无法处理任何无序事件或延迟数据,并且程序不必指定如何生成水,Flink会自动分配时间戳和自动生成水位线。

1.1.3 Processing Time 处理时间

Processing Time是指事件数据被Operator处理时所在机器的系统时间,是Flink默认使用的时间标准,它提供了最好的性能和最低的延迟。但是,Flink是一个在分布式的计算框架,数据从产生到被处理会有一定的延迟(例如从消息队列拉取数据到Source,Source再到处理的Operator会有一定的延迟),所以Processing Time无法精准的体现出数据在产生的那个时刻的变化情况。

1.1.4 设置时间标准

在不设置任何的时间标准的情况下,默认使用的是ProcessingTime,如果要使用某一种时间类型作为作为时间标准,那么就要使用StreamExecutionEnvironment的setStreamTimeCharacteristic,传入TimeCharacteristic其中的一个的枚举类型参数。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置EventTime作为时间标准
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//设置IngestionTime作为时间标准
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
//设置ProcessingTime作为时间标准
//env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

星哥原创,未经许可,不得转载

wx