09.Flink的Transformation(三)

9.Flink的Transformation(三)"

1 split拆分(DataStream → SplitStream)

该方法是将一个DataStream中的数据流打上不同的标签,逻辑的拆分成多个不同类型的流,返回一个新的SplitStream,本质上还是一个数据流,只不过是将流中的数据打上了不同的标签。

下面的例子是将一个DataStream中的对应的多个数字打上“even”偶数和“odd”奇数的标签,调用split方法,传入的拆分逻辑是一个匿名内部类,new OutputSelector<Integer>(),需要指定一个泛型,即输入和拆分后的数据都是Integer类型。该匿名内部类要重写一个select方法,在select方法中实现拆分逻辑,这里使用if判断,如果是偶数,将经even字符串添加到ArrayList中;如果是奇数,将经odd字符串添加到ArrayList中,本质上是将这个字符串标签和对应的数据一一关联,返回一个SplitedStream,以后可以根据标签的名称在挑选出来想要的数据流(调用select方法)。返回的SplitedStream已经标记为过时,1.12版本已经删除了split和select方法,可以使用侧流输出代替。

DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//将数据打上标签,拆分成奇数和偶数
SplitStream<Integer> splited = numbers.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> out = new ArrayList<String>();
        if (value % 2 == 0) {
            out.add(“even”); //将该条数据打上even的标签
        
} else {
            out.add(“odd”); //将该条数据打上odd的标签
        
}
        return out; //返回带有标签的集合
    
}
});

9.Flink的Transformation(三)"提示:select方法中的判断逻辑不仅仅只能有两中类型的标签,也可以打上更多的标签。

2 select 选择(SplitStream → DataStream)

该方法是将一个关联上多个标签的SplitStream中的数据流根据一个或多个标签名称进行筛选,返回一个新的DataStream。select方法传入的是一个字符串类型的可变参数,即可以选取一个标签名称的数据流,也可以选取多个标签名称的数据流。

//挑选出偶数的数据流
DataStream<Integer> even = splited.select(“even”);
//挑选出奇数的数据流
DataStream<Integer> odd = splited.select(“odd”);
//挑选出奇数、偶数全部的数据流
DataStream<Integer> all = splited.select(“even”,”odd”);

3 project 投影(DataStream → DataStream)

该方法只能是DataStream中的数据为Tuple类型是才可以使用该方法,project方法的功能和map方法类似,project 方法必须传入数字类型的可变参数,选择出Tuple对应下标的一到多个数据返回一个新的DataStream,该方法只有Java的API有,Scala的API没此方法。

下面的例子是将Tuple3中的下标为0,2对应的数据,即名称和年龄选取出来返回一个新的DataStream。

//使用fromElements生成数据,数据为Tuple3类型,分别代表姓名、性别、年龄
DataStreamSource<Tuple3<String, String, Integer>> users = env.fromElements(
        Tuple3.of(“佩奇”, “女”, 5), Tuple3.of(“乔治”, “男”, 3)
);
//投影方法只可以用于类型为Tuple的DataStream,返回的数据只要姓名和年龄
DataStream<Tuple> result = users.project(0, 2);

4 iterate迭代(DataStream → IterativeStream → DataStream)

DataStreamSource<String> strs = env.socketTextStream(“localhost”, 8888);DataStream<Long> numbers = strs.map(Long::parseLong);
//对Nums进行迭代
IterativeStream<Long> iteration = numbers.iterate();
DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() {
    @Override
    public Long map(Long value) throws Exception {
        System.out.println(“iterate input =>” + value);
        return value-=1;
    }
});
//只要满足value > 0的条件,就会形成一个回路,重新的迭代
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

星哥原创,未经许可,不得转载

wx