
1 map映射(DataStream → DataStream)
该方法是将一个DataStream调用map方法返回一个新的DataStream。本质是将该DataStream中对应的每一条数据依次迭代出来,应用map方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中对应的每一条数据,与新生成的DataStream中数据是一一对应的,也可以说是存在着映射关系的。
public class MapDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //调用env的fromElements创建一个非并行的DataStreamSource DataStreamSource<String> words = env.fromElements( “hadoop”,”spark”,”flink”,”hbase”,”flink”,”spark” ); //在map方法中传入MapFunction实现类实例,重写map方法 DataStream<String> upperWords = words.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { //将每一个单词转成大写 return value.toUpperCase(); } }); //调用Sink将数据打印在控制台 upperWords.print(); env.execute(“MapDemo”); } } |
DataStream的map方法,不但可以传入一个匿名内部类,还可以传入Lambda表达式,下面的例子是将输入为long类型的每一个数字乘以10,得到一个新的DataStream。
//使用并行Source生成数据 DataStreamSource<Long> numbers = env.generateSequence(1, 9); //调用map并传入Lambda表达式,将每一个数字乘以10,得到一个新的DataStream DataStream<Long> multiple = numbers.map(i -> i * 10); |
2 flatMap扁平化映射(DataStream → DataStream)
该方法是将一个DataStream调用flatMap方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条数据依次迭代出来,应用flatMap方法传入的计算逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过flatMap方法传入的计算逻辑后,会返回零到多条数据。所谓的扁平化即将原来的数据压平,返回多条数据。
DataStreamSource<String> lines = env.fromElements( “Hadoop spark flink”, “Spark hadoop spark”, “error error” );DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { //将一行字符串按空格切分成一个字符串数组 String[] arr = line.split(” “); //循环切分后的字符串数组 for (String word : arr) { //将过滤后的单词转成小写收集到Collector中 if(!word.equals(“error”)) { out.collect(word.toLowerCase()); } } } }); |
下面的例子是将一行字符串切分压平,并且将单词和1组合在一起放入到Tuple2中在flatMap方法中同时完成了。
DataStream<Tuple2<String, Integer>> wordAndOne = lines.flatMap( new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception { //将一行字符串按空格切分成一个字符串数组 String[] arr = line.split(” “); for (String word : arr) { //将单词转成小写放入到Collector中 out.collect(Tuple2.of(word.toLowerCase(), 1)); } } } ); |
如果是调用flatMap方法时传入Lambda表达式,需要在调用flatMap方法后,在调用returns方法指定返回的数据的类型。不然Flink无法自动推断出返回的数据类型,会出现异常。
DataStream<Tuple2<String, Integer>> wAndOne = lines.flatMap( (String line, Collector<Tuple2<String, Integer>> out) -> { Arrays.asList(line.split(“\\W+”)).forEach(word -> { out.collect(Tuple2.of(word.toLowerCase(), 1)); }); } ).returns(Types.TUPLE(Types.STRING, Types.INT)); //使用returns指定返回数据的类型 |
3 filter过滤(DataStream → DataStream)
该方法是将一个DataStream调用filter方法返回一个新的DataStream,本质上是将该DataStream中的对应的每一条输入数据依次迭代出来,应用filter方法传入的过滤逻辑,返回一个新的DataStream。原来的DataStream中输入的一条数据经过fliter方法传入的过滤逻辑后,返回true就会保留这条数据,返回false就会过滤掉该数据。
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //过滤掉奇数,保留偶数 DataStream<Integer> even = numbers.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value % 2 == 0; //过滤掉返回false的数组 } }); |
下面例子是使用Lambda表达式的方式过滤掉不是以h开头的单词。
DataStreamSource<String> words = env.fromElements(“hbase”, “hadoop”, “flink”); //过滤掉不是h开头的单词 DataStream<String> filtered = words.filter(w -> w.startsWith(“h”)); |
4 union 合并(DataStream * → DataStream)
该方法可以将两个或者多个数据类型一致的DataStream合并成一个DataStream。DataStream<T> union(DataStream<T>… streams)可以看出DataStream的union方法的参数为可变参数,即可以合并两个或多个数据类型一致的DataStream。
下面的例子是使用fromElements生成两个DataStream,一个是基数的,一个是偶数的,然后将两个DataStream合并成一个DataStream。
//使用fromElements创建两个DataStream DataStreamSource<Integer> odd = env.fromElements(1,3,5,7,9); DataStreamSource<Integer> even = env.fromElements(2,4,6,8,10); //将两个DataStream合并到一起 DataStream<Integer> result = odd.union(even); |
5 connect连接(DataStream,DataStream→ConnectedStreams)
connect翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样DataStream连接成一个新的ConnectedStreams。需要注意的是,connect方法与union方法不同,虽然调用connect方法将两个流连接成一个新的ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是可以让两个流共享State状态,状态相关的内容在后面章节讲解
//使用fromElements创建两个DataStream DataStreamSource<String> word = env.fromElements(“a”,”b”,”c”,”d”); DataStreamSource<Integer> num = env.fromElements(1,3,5,7,9); //将两个DataStream连接到一起 ConnectedStreams<String, Integer> connected = word.connect(num); |
6 coMap(ConnectedStreams → DataStream)
coMap并不是一个方法,而是对ConnectedStreams调用map方法。调用map方法,传入的计算逻辑是一个匿名内部类,new CoMapFunction<String, Integer, String>(),需要输入三个泛型:第一个String类型代表第一个DataStream输入的数据类型;第二个Integer类型代表第二个DataStream输入的数据类型;第三个String类型代返回DataStream的数据类型。这个匿名类要重写两个方法,一个是map1方法,是对第一个流进行map的处理逻辑。另一个是map2方法,是对二个流进行map的处理逻辑,这两个方法都必须返回String类型。最终返回一个新的DataStream中的数据是String。
//将两个DataStream连接到一起 ConnectedStreams<String, Integer> wordAndNum = word.connect(num); //对ConnectedStreams中两个流分别调用个不同逻辑的map方法 DataStream<String> result = wordAndNum.map(new CoMapFunction<String, Integer, String>() { @Override public String map1(String value) throws Exception { return value.toUpperCase(); //第一个map方法是将第一个流的字符变大写 } @Override public String map2(Integer value) throws Exception { return String.valueOf(value * 10); //第二个map方将是第二个流的数字乘以10并转成String } }); |
7 coFlatMap(ConnectedStreams → DataStream)
coFlatMap并不是一个方法,而是对ConnectedStreams调用flatMap方法。调用flatMap方法,传入的计算逻辑是一个匿名内部类,new CoFlatMapFunction<String, String, String>(),需要指定三个泛型:第一个String类型代表第一个DataStream输入的数据类型;第二个String类型代表第二个DataStream输入的数据类型;第三个String类型代返回DataStream的数据类型。这个匿名类要重写两个方法,一个是flatMap1方法,是对第一个流进行flatMap的处理逻辑。另一个是flatMap2方法,是对二个流进行flatMap的处理逻辑,这两个方法都必须返回String都要通过各自的Collector收集,最终返回一个新的DataStream中的数据是String。
//使用fromElements创建两个DataStream DataStreamSource<String> word = env.fromElements(“a b c”, “d e f”); DataStreamSource<String> num = env.fromElements(“1,2,3”, “4,5,6”); //将两个DataStream连接到一起 ConnectedStreams<String, String> connected = word.connect(num); //对ConnectedStreams中两个流分别调用个不同逻辑的flatMap方法 DataStream<String> result = connected.flatMap(new CoFlatMapFunction<String, String, String>() { @Override public void flatMap1(String value, Collector<String> out) throws Exception { String[] words = value.split(” “); for (String w : words) { out.collect(w); } } @Override public void flatMap2(String value, Collector<String> out) throws Exception { String[] nums = value.split(“,”); for (String n : nums) { out.collect(n); } } }); |
星哥原创,未经许可,不得转载