07.Flink的Transformation(一)

7.Flink的Transformation(一)"

1 map映射(DataStream → DataStream)

该方法是将一个DataStream调用map方法返回一个新的DataStream。本质是将该DataStream中对应的每一条数据依次迭代出来,应用map方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中对应的每一条数据,与新生成的DataStream中数据是一一对应的,也可以说是存在着映射关系的。

public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用env的fromElements创建一个非并行的DataStreamSource
        
DataStreamSource<String> words = env.fromElements(
                “hadoop”,”spark”,”flink”,”hbase”,”flink”,”spark”
        );
        //在map方法中传入MapFunction实现类实例,重写map方法
        
DataStream<String> upperWords = words.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    //将每一个单词转成大写
                    
return value.toUpperCase();
                }
        });
        //调用Sink将数据打印在控制台
        
upperWords.print();
        env.execute(“MapDemo”);
    }
}

DataStream的map方法,不但可以传入一个匿名内部类,还可以传入Lambda表达式,下面的例子是将输入为long类型的每一个数字乘以10,得到一个新的DataStream。

//使用并行Source生成数据
DataStreamSource<Long> numbers = env.generateSequence(1, 9);
//调用map并传入Lambda表达式,将每一个数字乘以10,得到一个新的DataStream
DataStream<Long> multiple = numbers.map(i -> i * 10);

2 flatMap扁平化映射(DataStream → DataStream)

该方法是将一个DataStream调用flatMap方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条数据依次迭代出来,应用flatMap方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过flatMap方法传入的计算逻辑后,会返回零到多条数据。所谓的扁平化即将原来的数据压平,返回多条数据。

DataStreamSource<String> lines = env.fromElements(
        “Hadoop spark flink”, “Spark hadoop spark”, “error error”
);DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String line, Collector<String> out) throws Exception {
            //将一行字符串按空格切分成一个字符串数组
            
String[] arr = line.split(” “);
            //循环切分后的字符串数组
            
for (String word : arr) {
                //将过滤后的单词转成小写收集到Collector中
                
if(!word.equals(“error”)) {
                    out.collect(word.toLowerCase());
                }
            }
        }
});

下面的例子是将一行字符串切分压平,并且将单词和1组合在一起放入到Tuple2中在flatMap方法中同时完成了。

DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap(
        new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                //将一行字符串按空格切分成一个字符串数组
                
String[] arr = line.split(” “);
                for (String word : arr) {
                    //将单词转成小写放入到Collector中
                    
out.collect(Tuple2.of(word.toLowerCase(), 1));
                }
            }
        }
);

如果是调用flatMap方法时传入Lambda表达式,需要在调用flatMap方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。

DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap(
        (String line, Collector<Tuple2<String, Integer>> out) -> {
            Arrays.asList(line.split(“\\W+”)).forEach(word -> {
                out.collect(Tuple2.of(word.toLowerCase(), 1));
            });
        }
).returns(Types.TUPLE(Types.STRING, Types.INT));  //使用returns指定返回数据的类型

3 filter过滤(DataStream → DataStream)

该方法是将一个DataStream调用filter方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条输入数据依次迭代出来,应用filter方法传入的过滤逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过fliter方法传入的过滤逻辑后,返回true就会保留这条数据,返回false就会过滤掉该数据。

DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
//过滤掉奇数,保留偶数
DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
            return value % 2 == 0; //过滤掉返回false的数组
        }
});

下面例子是使用Lambda表达式的方式过滤掉不是以h开头的单词。

DataStreamSource<String> words = env.fromElements(“hbase”, “hadoop”, “flink”);
//过滤掉不是h开头的单词
DataStream<String> filtered = words.filter(w -> w.startsWith(“h”));

4 union 合并(DataStream * → DataStream)

该方法可以将两个或者多个数据类型一致的DataStream合并成一个DataStream。DataStream<T> union(DataStream<T>… streams)可以看出DataStream的union方法的参数为可变参数,即可以合并两个或多个数据类型一致的DataStream。

下面的例子是使用fromElements生成两个DataStream,一个是基数的,一个是偶数的,然后将两个DataStream合并成一个DataStream。

//使用fromElements创建两个DataStream
DataStreamSource<Integer> odd = env.fromElements(1,3,5,7,9);
DataStreamSource<Integer> even = env.fromElements(2,4,6,8,10);
//将两个DataStream合并到一起
DataStream<Integer> result = odd.union(even);

5 connect连接(DataStream,DataStream→ConnectedStreams)

connect翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样DataStream连接成一个新的ConnectedStreams。需要注意的是,connect方法与union方法不同,虽然调用connect方法将两个流连接成一个新的ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是可以让两个流共享State状态,状态相关的内容在后面章节讲解

//使用fromElements创建两个DataStream
DataStreamSource<String> word = env.fromElements(“a”,”b”,”c”,”d”);
DataStreamSource<Integer> num = env.fromElements(1,3,5,7,9);
//将两个DataStream连接到一起
ConnectedStreams<String, Integer> connected = word.connect(num);

6 coMap(ConnectedStreams → DataStream)

coMap并不是一个方法,而是对ConnectedStreams调用map方法。调用map方法,传入的计算逻辑是一个匿名内部类,new CoMapFunction<String, Integer, String>(),需要输入三个泛型:第一个String类型代表第一个DataStream输入的数据类型;第二个Integer类型代表第二个DataStream输入的数据类型;第三个String类型代返回DataStream的数据类型。这个匿名类要重写两个方法,一个是map1方法,是对第一个流进行map的处理逻辑。另一个是map2方法,是对二个流进行map的处理逻辑,这两个方法都必须返回String类型。最终返回一个新的DataStream中的数据是String。

//将两个DataStream连接到一起
ConnectedStreams<String, Integer> wordAndNum = word.connect(num);
//对ConnectedStreams中两个流分别调用个不同逻辑的map方法
DataStream<String> result = wordAndNum.map(new CoMapFunction<String, Integer, String>() {
    @Override
    public String map1(String value) throws Exception {
        return value.toUpperCase(); //第一个map方法是将第一个流的字符变大写
    
}
    @Override
    public String map2(Integer value) throws Exception {
        return String.valueOf(value * 10); //第二个map方将是第二个流的数字乘以10并转成String
    
}
});

7 coFlatMap(ConnectedStreams → DataStream)

coFlatMap并不是一个方法,而是对ConnectedStreams调用flatMap方法。调用flatMap方法,传入的计算逻辑是一个匿名内部类,new CoFlatMapFunction<String, String, String>(),需要指定三个泛型:第一个String类型代表第一个DataStream输入的数据类型;第二个String类型代表第二个DataStream输入的数据类型;第三个String类型代返回DataStream的数据类型。这个匿名类要重写两个方法,一个是flatMap1方法,是对第一个流进行flatMap的处理逻辑。另一个是flatMap2方法,是对二个流进行flatMap的处理逻辑,这两个方法都必须返回String都要通过各自的Collector收集,最终返回一个新的DataStream中的数据是String。

//使用fromElements创建两个DataStream
DataStreamSource<String> word = env.fromElements(“a b c”, “d e f”);
DataStreamSource<String> num = env.fromElements(“1,2,3”, “4,5,6”);
//将两个DataStream连接到一起
ConnectedStreams<String, String> connected = word.connect(num);
//对ConnectedStreams中两个流分别调用个不同逻辑的flatMap方法
DataStream<String> result = connected.flatMap(new CoFlatMapFunction<String, String, String>() {
    @Override
    public void flatMap1(String value, Collector<String> out) throws Exception {
        String[] words = value.split(” “);
        for (String w : words) {
            out.collect(w);
        }
    }
    @Override
    public void flatMap2(String value, Collector<String> out) throws Exception {
        String[] nums = value.split(“,”);
        for (String n : nums) {
            out.collect(n);
        }
    }
});

星哥原创,未经许可,不得转载

wx