08.Flink的Transformation(二)

8.Flink的Transformation(二)"

1 keyBy按key分组(DataStream → KeyedStream)

下面的例子是将一行字符串进行flatMap切分成多个单词,然后将单词和1放入到Tuple2中,Tuple2中的第一个元素是单词,第二个元素的数字1。经过flatMap处理后,返回一个新的DataStream<Tuple2<String, Integer>>,然后调用keyBy算子,传入的是单词在Tuple2中对应的下标值,因为单词是在第0个位置,所以keyBy传入的值为0,分完组后,将key相同的即在同一个组内的次数进行sum,因为次数在Tuple2的下标值为1,所以传入的值为1。

//使用fromElements模拟生成的数据
DataStreamSource<String> lines = env.fromElements(
        “flink spark hadoop hbase hive”, “spark flink hadoop hue hbase”
);
//将一行数据切分后得到的每一个单词和1组合放入到Tuple2中
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
        new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = line.split(“\\W+”);
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        }
);
//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);

8.Flink的Transformation(二)"提示:Java中没有元组这种数据类型,在Flink的Java API中,为了方便封装数据,特别定义了Tulpe类型,Tuple是一个抽象类,有Tuple0到Tuple25共26个实现类,例如Tuple3可以存放3个元素,类型可以不一样,下标分别是0,1,2。

只有一个DataStream中数据是Tuple类型,调用keyBy算子时可传入参数为分组字段对应的下标值。来开一下keyBy方法的方法签名:KeyedStream<T, Tuple> keyBy(int… fields),keyBy方法的参数是一个可变参数,说明可以按照一到多个字段进行分组,就类似数据库SQL语句group by后面可以跟一到多个分组条件一样。需求说明的是,KeyedStream<T, Tuple>中的两个泛型,第一个代表数据分组之前的类型,第二个类型为Tuple,即分组的key,由于keyBy可以按照一到多个字段分组,如果是按照一个字段就是Tuple1,按照两个字段分组就是Tuple2,为了可以通用,所以使用父类型Tuple。

如果DataStream中的数据类型不是Tuple,比如是一个自定义的JavaBean,例如下面定义的一个CountBean,里面有两个public的成员变量,分别是String类型的word,还有一个是Integer类型的count,分别代表单词和次数。

8.Flink的Transformation(二)"提示:在Flink开发中,如果定义一个JavaBean来封装数据,通常定义成public的成员变量,这是为了以后赋值和反射更加方便,如果定义了有参的构造方法,那么一定要再定义一个无参的构造方法,不然运行时会出现异常。并且最好定义一个静态的of方法用来赋值,这样更加方便。可以查看一下Flink中Tuple的源代码也是这么实现的。

public class CountBean {
    public String word;
    public Integer count;
    public CountBean(){}
    public CountBean(String word, Integer count) {
        this.word = word;
        this.count = count;
    }
    public static CountBean of(String word, Integer count) {
        return new CountBean(word, 1);
    }
    @Override
    public String toString() {
        return “CountBean{” + “word='” + word + ‘\” + “, count=” + count + ‘}’;
    }    @Override
    public int hashCode() {
       return word.hashCode();
    }
}

下面的例子是将一个DataSteam调用flatMap时,切分压平后将单词和1封装到CountBean中,得到一个新的DataStream<CountBean>,然后在调用keyBy进行分组,这个时候keyBy方法就不能传入数字了,可以传入要分组的字段名称,并且也可以按照多个指定分组,传入多个字段名称即可。

//将一行数据切分后得到的每一个单词和1组合放入到自定义的bean实例中
DataStream<CountBean> wordAndOne = lines.flatMap(
        new FlatMapFunction<String, CountBean>() {
            @Override
            public void flatMap(String line,Collector<CountBean> out) throws Exception {
                String[] words = line.split(“\\W+”);
                for (String word : words) {
                    //将切分后的但是循环放入到bean中
                    
out.collect(CountBean.of(word, 1));
                }
            }
        }
);
//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy(“word”);

2 aggregates聚合(KeyedStream → DataStream)

aggregates并不是一个算子,而是多个聚合算子的统称,其中有sum、min、minBy、max、maxBy这些算子。一个KeyedStream调用其中一个aggregates方法返回一个新的DataStream。本质上是将该KeyedStream一个组内持续输入的数据按照对应的聚合逻辑进行实时滚动地聚合,生成一个新的DataStream。

2.1. sum

该算子实现实时滚动相加的功能,即新输入的数据和历史数据进行相加。只有KeyedStream才可以调用sum方法,如果Tuple类型数据,可以传入一个要聚合的字段对应Tuple的下标(数字)。

//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);//将分组后的数据进行sum运算
DataStream<Tuple2<String, Integer>> result = keyed.sum(1);

如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。

//按照Bean中的属性名word进行分组
KeyedStream<CountBean, Tuple> keyed = wordAndOne.keyBy(“word”);
//按照Bean中的属性名count进行sum聚合
DataStream<CountBean> result = keyed.sum(“count”);

2.2. min、minBy

这两个算子都是实现滚动比较最小值的功能,只有KeyedStream才可以调用min、minBy方法,如果Tuple类型数据,可以传入一个要比较的字段对应Tuple的下标(数字),如果是自定义的POJO类型数据,可以传入一个要聚合的字段名称。

min和minBy的区别在于,min返回的是参与分组的字段和要比较字段的最小值,如果数据中还有其他字段,其他字段的值是总是第一次输入的数据的值。而minBy返回的是要比较的最小值对应的全部数据。

//使用fromElements模拟生成的数据
DataStreamSource<Tuple3<String, String, Integer>> wordAndCount = env.fromElements(
        Tuple3.of(“佩奇”, “女”, 5), Tuple3.of(“瑞贝卡”, “女”, 6), Tuple3.of(“苏西”, “女”, 4),
        Tuple3.of(“乔治”, “男”, 5), Tuple3.of(“理查德”, “男”, 4), Tuple3.of(“丹尼”, “男”, 5)
);
KeyedStream<Tuple3<String, String, Integer>, Tuple> keyedStream = wordAndCount.keyBy(1);
//将分组后的数据进行调用min、minBy
DataStream<Tuple3<String, String, Integer>> min1 = keyedStream.min(2);
DataStream<Tuple3<String, String, Integer>> min2 = keyedStream.minBy(2);
DataStream<Tuple3<String, String, Integer>> min3 = keyedStream.minBy(2, false);
//调用print sink打印结果
min1.print(“min “);
min2.print(“minBy “);
min3.print(“minBy last “);

上面min方法返回最终的结果是:(佩奇,女,4) 和 (乔治,男,3)。可以看出参与分组的性别和参与比较的年龄是正确的,但是第一个名称不正确,而是第一次出现的数据。

上面minBy方法返回最终的结果是:(瑞贝卡,女,4) 和 (乔治,男,3)。可以看出参与分组的性别和参与比较的年龄是正确的,还有姓名也是分组后最小值对应的名称,说明minBy返回的是最小值对应的那一条整体数据。另外该方法还可以传入一个Boolean值,默认是true,即如果要比较的值相等,true就是返回最先出现的最小值,false返回最后出现的最小值。

2.3. max、maxBy

这两个算子都是实现滚动比较最大值的功能,用法和min、minBy相同。

3 reduce归约(KeyedStream → DataStream)

该方法是将一个KeyedStream调用reduce方法返回一个新的DataStream,本质上是将该KeyedStream一个组内持续输入的数据按照传入的聚合逻辑进行滚动地聚合,生成一个新的DataStream。

//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
//将分组后的数据进行reduce
DataStream<Tuple2<String, Integer>> reduced = keyed.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
        @Override
        public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                
t1.f1 += t2.f1; //将元组对应的次数进行累加
                return t1; //返回累加后的元组
            
}
        }
);

4 fold折叠(KeyedStream → DataStream)

该方法是将一个KeyedStream调用fold方法返回一个新的DataStream,该方法与reduce方法类似,只不过该方法需要传入两个参数,第一个参数是初始值,第二个是聚合逻辑。该方法已经被标记为过时,1.12版本已被移除。

//按照Tuple2中的第0个位置进行分组,分组后得到KeyedStream
KeyedStream<Tuple2<String, Integer>, Tuple> keyed = wordAndOne.keyBy(0);
DataStream<Tuple2<String, Integer>> result = keyed.fold(Tuple2.of(null, 0), //指定初始值
        
new FoldFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> fold(Tuple2<String, Integer> accumulator,
                    Tuple2<String, Integer> input) throws Exception {
                input.f1 += accumulator.f1; //将初始值或中间累加的结果跟输入的数据进行累加
                
return input;
            }
        }
);

星哥原创,未经许可,不得转载

wx