12.Flink窗口算子(一)

1.1 Window Assinger

当调用window或windowAll方法时,所传入的参数就是Window Assigner(窗口分配器),其作用是决定划分什么样类型的窗口,即以何种条件划分窗口,输入的数据以何种方式分配到窗口内,窗口如何触发等等。Flink提供了Tumbling windows(滚动窗口)、Sliding windows(滑动窗口), Session windows(会话窗口)和Global windows(全局窗口,另外CountWindow属于Global windows)。这些自带的Window Assigner可以满足大多数的场景,如果有特殊需要可以继承WindowAssigner这个抽象类实现自己的Window Assigner。在以上四种内置Window Assigner中,除了Global windows,其他三种都属于时间类型的窗口,它们既可以按照ProcessingTime划分窗口,也可以按照EventTime划分窗口。

时间类型的窗口,都有窗口的起始时间和结束时间,时间的类型是timestamp格式(timestamp的值是指从1970年1月1日0时0分0秒到现在的long类型毫秒数)。窗口的起始时间、结束时间是前闭后开的,即包括起始时间,不包括结束时间,并且窗口的起始时间和结束时间必须是窗口长度的整数倍。例如一个滚动窗口的长度为10秒,起始时间是2020-01-01 00:00:00,那么对应的timestamp格式就是[1577808000000, 1577808010000)。窗口的start就是1577808000000,窗口的end就是1577808010000,窗口的maxTimestamp就是窗口的end减1即1577808009999。

1.1.1 Tumbling Windows滚动窗口

滚动窗口是按照时间划分的窗口,其Assinger会将输入的每一条数据按照时间分配到固定长度的窗口内,并且按照这个固定的时间进行滚动,窗口和窗口之间没有数据重叠。

(1) Non-Keyed Tumbling Windows

下面的代码是滚动窗口的基本使用:

DataStream<Integer> numbers = …numbers.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))); DataStream<Integer> numbers = …numbers.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)));
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
   if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return windowAll(TumblingProcessingTimeWindows.of(size));
   } else {
      return windowAll(TumblingEventTimeWindows.of(size));
   }
}

(2) Keyed Tumbling Windows

  DataStream<Tuple2<String, Integer>> wordAndOne = …;
//EventTime滚动窗口
wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))  //窗口长度为5秒
        .sum(1); //触发窗口对窗口内的数据进行sum运算
//ProcessingTime滚动窗口
wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //窗口长度为5秒
        .sum(1); //触发窗口对窗口内的数据进行sum运算

wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) //窗口长度为1天,同时将数据转换成对应的时区的时间
        .sum(1); //触发窗口对窗口内的数据进行sum运算

除了上面几种创建滚动窗口的方方法,Flink还提供了一个更简单的方发创建滚动窗口,

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
   if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return window(TumblingProcessingTimeWindows.of(size));
   } else {
      return window(TumblingEventTimeWindows.of(size));
   }
}

窗口的时间间隔也可以指定其他类型,例如:Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x)、Time.hours(x)、Time.days(x)。除此之外还可以调用Time的of方法,传入数字和时间单位TimeUnit,例如Time.of(1, TimeUnit.HOURS)。

TumblingWindows的of方法如果指定一个参数,就会按照指定的时间周期性的滚动形成新的窗口,例如TumblingProcessingTimeWindows.of(Time.days(1)),那么窗口的起始时间是以当前系统的ProcessingTime的整点开始以小时为单位对齐。例如[1:00:00.000, 1:59:59.999]对应一个窗口,[2:00:00.000, 2:59:59.999]会对应下一个窗口,并且会不断的生成窗口。(为了方便描述,才使用1:00:00.000这种格式,窗口的时间其实是timestamp格式)。

TumblingWindows的of方法还可以传入2个参数,第二个参数的作用是将时间调整成指定时区的时间。在UTC-0以外的时区,就需要指定一个偏移量进行调整。例如,在中国就必须指定Time.hours(-8)的偏移量。

1.1.2 Sliding Windows 滑动窗口

滑动窗口是按照时间划分的窗口,其Assinger会将输入的每一条数据按照时间分配到固定长度的窗口内,并且还可以指定一个额外的滑动参数用来指定窗口滑动的频率(也叫滑动步长),因此当滑动步长小于窗口的长度时,窗口和窗口之间有数据重叠。

(1) Non-Keyed Sliding Windows

下面的代码是滑动窗口的基本使用:

DataStream<Integer> numbers = …numbers.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))); DataStream<Integer> numbers = …numbers.windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))); 
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
   if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return windowAll(SlidingProcessingTimeWindows.of(size, slide));
   } else {
      return windowAll(SlidingEventTimeWindows.of(size, slide));
   }
}

(2) Keyed Sliding Windows

//EventTime滑动窗口wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) //窗口长度为10秒,5秒钟滑动一次
        .sum(1); //触发窗口对窗口内的数据进行sum运算
//ProcessingTime滑动窗口
wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))) //窗口长度为10秒,5秒钟滑动一次
        .sum(1);

wordAndOne
        .keyBy(0) //指定key selector 分组字段//窗口长度为12小时,1小时滑动一次,同时将数据转换成对应的时区的时间
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .sum(1); //触发窗口对窗口内的数据进行sum运算
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
   if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
      return window(SlidingProcessingTimeWindows.of(size, slide));
   } else {
      return window(SlidingEventTimeWindows.of(size, slide));
   }
}

SlidingWindows的of方法如果指定两个参数,第一个参数为窗口的长度,第二个为滑动的频率(或加滑动步长)。例如SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)),那么窗口的起始时间是以数据对应的EventTime并且是滑动步长的整数倍为单位对齐。例如[1:00:00.000, 1:00:09.999]对应一个窗口,[1:00:05.000, 1:00:14.999]会对应下一个窗口,两窗口有数据重叠,并且会不断的生成窗口。

1.1.3 Session Windows 会话窗口

会话窗口是按照时间间隔划分窗口的,当超过指定的时间间隔,就会划分一个新的窗口。会话窗口没有固定的起始时间和结束时间,窗口中的数据也不会重叠。会话窗口可以指定一个固定的时间间隔,也可以根据数据中的信息传入一个函数计算出一个动态变化的时间间隔。

下面的代码是会话窗口的基本使用:

//EventTime会话窗口wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(EventTimeSessionWindows.withGap(Time.minutes(10))) //指定固定的时间间隔为10分钟
        .sum(1); //触发窗口对窗口内的数据进行sum运算

wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(EventTimeSessionWindows.withDynamicGap((element) -> {
            return element.f1 * 1000; //指定一个动态的时间间隔,根据数据的f1字段乘以1000得到,返回的是long类型
        }))
        .sum(1); //触发窗口对窗口内的数据进行sum运算
//ProcessingTime会话窗口
wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .sum(1); //触发窗口对窗口内的数据进行sum运算
wordAndOne
        .keyBy(0) //指定key selector 分组字段
        .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
            return element.f1 * 1000; //指定一个动态的时间间隔,根据数据的f1字段乘以1000得到,返回的是long类型
        }))
        .sum(1); //触发窗口对窗口内的数据进行sum运算

1.1.4 Global Windows 全局窗口

全局窗口将key相同的数据都分配到一个单独的窗口中,每一种key对应一个全局窗口,多个全局窗口之间是相互独立的。如果是Non-Keyed Windows,就仅有一个全局窗口。全局窗口没有结束的边界,使用的Trigger(触发器)是NeverTrigger。如果不对全局窗口指定一个触发器,窗口是不会触发计算的,

下面的代码是全局窗口的基本使用:

wordAndOne.keyBy(0) //指定key selector 分组字段
        .window(GlobalWindows.create())  //调用window方法传入GlobalWindows,该窗口永远不会触发。
        .sum(1);

Count Windows属于Global Windows并指定了CountTrigger,下面是countWindowAll和countWindow方法的源代码:

//countWindowAll方法源代码,调用windowAll方法,传入GlobalWindows,并指定CountTriggerpublic AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
   return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}//countWindow方法源代码,调用window方法,传入GlobalWindows,并指定CountTriggerpublic WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
   return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
}
  • Non-Keyed Count Windows

是按照窗口中接收到数据的条数划分窗口的,跟时间无关。如果没有达到指定的条数,窗口不会被触发执行。

下面的例子是不分组调用countWindowAll划分窗口,具体逻辑是将输入的数字转成int,然后按照输入数据的条数划分窗口,当数据达到3条,就会形成一个完整的窗口,触发窗口并窗口内的3条数据集进行sum运算。

DataStreamSource<String> lines = env.socketTextStream(“localhost”, 8888); //使用nc -lk 8888命令输入数字
DataStream<Integer> numbers = lines.map(Integer::parseInt); //将字符转成int//不分组,使用countWindowAll划分窗口,窗口中数量达到3条,触发窗口执行
AllWindowedStream<Integer, GlobalWindow> windowed = numbers.countWindowAll(3);
DataStream<Integer> summed = windowed.sum(0); //将窗口中的数字相加summed.print(); //使用print sink打印输出结果

先使用nc -lk 8888启动netcat服务,然后启动上面的countWindowAll的例子,在命令行中输入依次输入:

123456

如果输入的数据小于3条,不会触发窗口,当数据输入为3条时,窗口触发并使用对应的sink输出结果。然后再重新计数。

  • Keyed Count Windows

下面的例子是先调用keyBy分组后再调用countWindow划分窗口,具体逻辑是将输入的单词和1,组合放入到Tuple2中,然后调用keyBy按照单词分组,一个组对应一个窗口。当一个组内的数据达到3条,就会触发这个组对应的窗口并对组内的数据进行sum运算。

DataStreamSource<String> words = env.socketTextStream(“localhost”, 8888);
DataStream<Tuple2<String, Integer>> wordAndOne = words
        .map(w -> Tuple2.of(w, 1)) //将单词和1组合放入一个Tuple2
        .returns(Types.TUPLE(Types.STRING, Types.INT)); //使用了Lambda表达式,要明确指定返回类型
WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> windowed = wordAndOne
        .keyBy(0) //先分组后再划分Count窗口
        .countWindow(3); //一个组内达数据达到3条,该组的数据触发执行
DataStream<Tuple2<String, Integer>> summed = windowed.sum(1);  //对窗口的数据进行sum
summed.print(); //使用print sink打印输出结果

同样使用nc -lk 8888启动netcat服务,然后启动上面的countWindow的例子,在命令行中输入依次输入:

flinksparksparkflinkflinkflinkspark

如果相同的单词输入的小于3条,即同一个组内的数据没有达到3条,窗口不会触发,当同一组内的数据达到3条时,这个组对应的窗口就会触发并使用相应的sink输出结果。然后这个组再重新计数。

1.2 ProcessingTime类型窗口

1.2.1 TumblingProcessingTimeWindows

在使用Processing Time作为时间标准时,是按照window operator算子所在机器的系统的时间,周期性的滚动。

(1)Non-Keyed TumblingProcessingTimeWindows

下面的例子是不分组调用timeWindowAll划分窗口,具体逻辑是将输入的数字转成int,然后按照Processing Time 5秒钟划分一个窗口,即按照系统时间5秒钟形成一个窗口,如果窗口中有数据就进行sum运算。

DataStream<Integer> numbers = …//滚动窗口,5秒钟滚动一次
AllWindowedStream<Integer, TimeWindow> window = numbers
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//对窗口中的数字进行sum
DataStream<Integer> summed = window.sum(0);

先使用nc -lk 8888启动netcat服务,然后启动上面的timeWindowAll的例子,在一个窗口形成的5秒钟内输入1、2、3等窗口触发后得到的结果为6。在下一个窗口形成的5秒钟内输入4、5再触发得到的结果为9。

12345

(2)Keyed TumblingProcessingTimeWindows

下面的例子是先调用keyBy分组后再调用timeWindow划分窗口,由于默认使用的是Processing Time,所以调用timeWindow方法传入滚动窗口的时间,等价于调用window方法再传入TumblingProcessingTimeWindows并制定窗口的时间。具体逻辑是将输入的单词和1,组合放入到Tuple2中,再调用keyBy按照单词分组,然后按照Processing Time 5秒钟划分一个窗口,即按照系统时间5秒钟形成一个窗口,一个窗口内可以有多个组,窗口触发就会对该窗口中的每一个组的数据进行sum运算。

DataStream<Tuple2<String, Integer>> wordAndOne = …KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//按照ProcessingTime划分滚动窗口,窗口长度为5秒
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window1 = keyed.timeWindow(Time.seconds(5));
//也可以调用window方法,传入具体的window类型,并指定窗口长度,所以window1和window2是一样的
//WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window2 = keyed
//        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
//对窗口每一个组进行聚合
DataStream<Tuple2<String, Integer>> summed = window1.sum(1);

同样使用nc -lk 8888启动netcat服务,然后启动上面的timeWindow的例子,在一个窗口形成的5秒钟内输入flink、spark、flink得到的结果为(flink, 2)和(spark, 1)。在下一个窗口形成的5秒钟内输入spark、flink、spark得到的结果为(flink, 1)和(spark, 2)。

flinksparkflinksparkflinkspark

1.2.2 SlidingProcessingTimeWindows

在使用Processing Time作为时间标准时,是按照window operator算子所在机器的系统的时间,周期性的滑动,并且窗口的长度大于滑动步长。

(1)Non-Keyed SlidingProcessingTimeWindows

下面的例子是不分组调用timeWindowAll,窗口长度为10秒,5秒钟滑动一次,具体逻辑是将输入的数字转成int,然后按照Processing Time 5秒钟滑动一次形成一个窗口,如果窗口中有数据就进行sum运算。需要注意的是,下一次滑动时,新的窗口内新输入的数据还要和该窗口其余的历史数据进行累加。

DataStream<Integer> numbers = …

//对窗口内的数字进行sum
DataStream<Integer> summed = window1.sum(0);

先使用nc -lk 8888启动netcat服务,然后启动上面的timeWindowAll的例子,在一个窗口形成的5秒钟内输入1、2、3等窗口触发后得到的结果为6。在下一个5秒钟内输入4、5得到的结果为15。

1234578

(2)Keyed SlidingProcessingTimeWindows

下面的例子是先调用keyBy分组后再调用timeWindow划分窗口,指定窗口长度为10秒,每5秒钟滑动一次,具体逻辑是将输入的单词和1,组合放入到Tuple2中,再调用keyBy按照单词分组,然后按照Processing Time 5秒钟滑动一次,即按照系统时间5秒钟滑动一次就形成一个新的窗口,一个窗口内可以有多个组,窗口触发就会对该窗口中的每一个组的数据进行sum运算。需要注意的是,下一次滑动时,新的窗口内新输入的数据分组后还要和该窗口其余的历史数据key相同的组进行累加。

DataStream<Tuple2<String, Integer>> wordAndOne = …KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//滑动窗口,窗口长度为10秒,滑动步长为5秒
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window1 = keyed
        .timeWindow(Time.seconds(10), Time.seconds(5));//也可以调用window方法,传入具体的window类型,并指定窗口长度,和滑动步长,所以window1和window2是一样的//WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> window2 = keyed
//        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)));
//对窗口内每一个组进行sum
DataStream<Tuple2<String, Integer>> summed = window1.sum(1);

1.2.3 ProcessingTimeSessionWindows

会话窗口是按照时间划分窗口的,在使用Processing Time作为时间标准时,window operator算子所在机器的系统的时间,减去上一条数据的Processing Time,如果大于指定的时间间隔就会形成一个新的窗口。

  • Non-Keyed ProcessingTimeSessionWindows

下面的例子是不分组调用windowAll方法传入ProcessingTimeSessionWindows划分窗口,并指定时间间隔为5秒。具体逻辑是将输入的数字转成int,如果窗口中有数据就进行sum运算。

DataStream<Integer> numbers = …
//不分组,划分会话窗口,如果系统的时间减去上一条数据的Processing Time大于5秒就形成一个新的窗口并触发执行
AllWindowedStream<Integer, TimeWindow> windowed = numbers
        .windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
//对窗口内的数字进行sum
DataStream<Integer> summed = windowed.sum(0);

(2)Keyed ProcessingTimeSessionWindows

下面的例子是分组后调用window方法传入ProcessingTimeSessionWindows划分窗口,并指定时间间隔为5秒。一个窗口内可以有多个组,窗口触发就会对该窗口中的每一个组的数据进行sum运算。

//先分组再划分会话窗口
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
WindowedStream<Tuple2<String, Integer>, Tuple, TimeWindow> windowed = keyed
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
//将窗口内每一个组的数字sum
DataStream<Tuple2<String, Integer>> summed = windowed.sum(0);

星哥原创,未经许可,不得转载

wx