
1 keyBy按key分组(DataStream → KeyedStream)
下面的例子是将一行字符串进行flatMap切分成多个单词,然后将单词和1放入到Tuple2中,Tuple2中的第一个元素是单词,第二个元素的数字1。经过flatMap处理后,返回一个新的DataStream<Tuple2<String, Integer>>,然后调用keyBy算子,传入的是单词在Tuple2中对应的下标值,因为单词是在第0个位置,所以keyBy传入的值为0,分完组后,将key相同的即在同一个组内的次数进行sum,因为次数在Tuple2的下标值为1,所以传入的值为1。
//使用fromElements模拟生成的数据 DataStreamSource<String> lines = env.fromElements( “flink spark hadoop hbase hive”, “spark flink hadoop hue hbase” ); //将一行数据切分后得到的每一个单词和1组合放入到Tuple2中 DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { String[] words = line.split(“\\W+”); for (String word : words) { out.collect(Tuple2.of(word, 1)); } } } ); //按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); |
提示:Java中没有元组这种数据类型,在Flink的Java API中,为了方便封装数据,特别定义了Tulpe类型,Tuple是一个抽象类,有Tuple0到Tuple25共26个实现类,例如Tuple3可以存放3个元素,类型可以不一样,下标分别是0,1,2。
只有一个DataStream中数据是Tuple类型,调用keyBy算子时可传入参数为分组字段对应的下标值。来开一下keyBy方法的方法签名:KeyedStream<T, Tuple> keyBy(int… fields),keyBy方法的参数是一个可变参数,说明可以按照一到多个字段进行分组,就类似数据库SQL语句group by后面可以跟一到多个分组条件一样。需求说明的是,KeyedStream<T, Tuple>中的两个泛型,第一个代表数据分组之前的类型,第二个类型为Tuple,即分组的key,由于keyBy可以按照一到多个字段分组,如果是按照一个字段就是Tuple1,按照两个字段分组就是Tuple2,为了可以通用,所以使用父类型Tuple。
如果DataStream中的数据类型不是Tuple,比如是一个自定义的JavaBean,例如下面定义的一个CountBean,里面有两个public的成员变量,分别是String类型的word,还有一个是Integer类型的count,分别代表单词和次数。
提示:在Flink开发中,如果定义一个JavaBean来封装数据,通常定义成public的成员变量,这是为了以后赋值和反射更加方便,如果定义了有参的构造方法,那么一定要再定义一个无参的构造方法,不然运行时会出现异常。并且最好定义一个静态的of方法用来赋值,这样更加方便。可以查看一下Flink中Tuple的源代码也是这么实现的。
public class CountBean { public String word; public Integer count; public CountBean(){} public CountBean(String word, Integer count) { this.word = word; this.count = count; } public static CountBean of(String word, Integer count) { return new CountBean(word, 1); } @Override public String toString() { return “CountBean{” + “word='” + word + ‘\” + “, count=” + count + ‘}’; } @Override public int hashCode() { return word.hashCode(); } } |
下面的例子是将一个DataSteam调用flatMap时,切分压平后将单词和1封装到CountBean中,得到一个新的DataStream<CountBean>,然后在调用keyBy进行分组,这个时候keyBy方法就不能传入数字了,可以传入要分组的字段名称,并且也可以按照多个指定分组,传入多个字段名称即可。
//将一行数据切分后得到的每一个单词和1组合放入到自定义的bean实例中 DataStream<CountBean> wordAndOne = lines.flatMap( new FlatMapFunction<String, CountBean>() { @Override public void flatMap(String line,Collector<CountBean> out) throws Exception { String[] words = line.split(“\\W+”); for (String word : words) { //将切分后的但是循环放入到bean中 out.collect(CountBean.of(word, 1)); } } } ); //按照Bean中的属性名word进行分组 KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy(“word”); |
2 aggregates聚合(KeyedStream → DataStream)
aggregates并不是一个算子,而是多个聚合算子的统称,其中有sum、min、minBy、max、maxBy这些算子。一个KeyedStream调用其中一个aggregates方法返回一个新的DataStream。本质上是将该KeyedStream一个组内持续输入的数据按照对应的聚合逻辑进行实时滚动地聚合,生成一个新的DataStream。
2.1. sum
该算子实现实时滚动相加的功能,即新输入的数据和历史数据进行相加。只有KeyedStream才可以调用sum方法,如果Tuple类型数据,可以传入一个要聚合的字段对应Tuple的下标(数字)。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);//将分组后的数据进行sum运算 DataStream<Tuple2<String, Integer>> result = keyed.sum(1); |
如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。
//按照Bean中的属性名word进行分组 KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy(“word”); //按照Bean中的属性名count进行sum聚合 DataStream<CountBean> result = keyed.sum(“count”); |
2.2. min、minBy
这两个算子都是实现滚动比较最小值的功能,只有KeyedStream才可以调用min、minBy方法,如果Tuple类型数据,可以传入一个要比较的字段对应Tuple的下标(数字),如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。
min和minBy的区别在于,min返回的是参与分组的字段和要比较字段的最小值,如果数据中还有其他字段,其他字段的值是总是第一次输入的数据的值。而minBy返回的是要比较的最小值对应的全部数据。
//使用fromElements模拟生成的数据 DataStreamSource<Tuple3<String, String, Integer>> wordAndCount = env.fromElements( Tuple3.of(“佩奇”, “女”, 5), Tuple3.of(“瑞贝卡”, “女”, 6), Tuple3.of(“苏西”, “女”, 4), Tuple3.of(“乔治”, “男”, 5), Tuple3.of(“理查德”, “男”, 4), Tuple3.of(“丹尼”, “男”, 5) ); KeyedStream<Tuple3<String, String, Integer>, Tuple> keyedStream = wordAndCount.keyBy(1); //将分组后的数据进行调用min、minBy DataStream<Tuple3<String, String, Integer>> min1 = keyedStream.min(2); DataStream<Tuple3<String, String, Integer>> min2 = keyedStream.minBy(2); DataStream<Tuple3<String, String, Integer>> min3 = keyedStream.minBy(2, false); //调用print sink打印结果 min1.print(“min “); min2.print(“minBy “); min3.print(“minBy last “); |
上面min方法返回最终的结果是:(佩奇,女,4) 和 (乔治,男,3)。可以看出参与分组的性别和参与比较的年龄是正确的,但是第一个名称不正确,而是第一次出现的数据。
上面minBy方法返回最终的结果是:(瑞贝卡,女,4) 和 (乔治,男,3)。可以看出参与分组的性别和参与比较的年龄是正确的,还有姓名也是分组后最小值对应的名称,说明minBy返回的是最小值对应的那一条整体数据。另外该方法还可以传入一个Boolean值,默认是true,即如果要比较的值相等,true就是返回最先出现的最小值,false返回最后出现的最小值。
2.3. max、maxBy
这两个算子都是实现滚动比较最大值的功能,用法和min、minBy相同。
3 reduce归约(KeyedStream → DataStream)
该方法是将一个KeyedStream调用reduce方法返回一个新的DataStream,本质上是将该KeyedStream一个组内持续输入的数据按照传入的聚合逻辑进行滚动地聚合,生成一个新的DataStream。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); //将分组后的数据进行reduce DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception { t1.f1 += t2.f1; //将元组对应的次数进行累加 return t1; //返回累加后的元组 } } ); |
4 fold折叠(KeyedStream → DataStream)
该方法是将一个KeyedStream调用fold方法返回一个新的DataStream,该方法与reduce方法类似,只不过该方法需要传入两个参数,第一个参数是初始值,第二个是聚合逻辑。该方法已经被标记为过时,1.12版本已被移除。
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0); DataStream<Tuple2<String, Integer>> result = keyed. new @Override public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator, Tuple2<String, Integer> input) throws Exception { input.f1 += accumulator.f1; //将初始值或中间累加的结果跟输入的数据进行累加 return input; } } ); |
星哥原创,未经许可,不得转载