
1 split拆分(DataStream → SplitStream)
该方法是将一个DataStream中的数据流打上不同的标签,逻辑的拆分成多个不同类型的流,返回一个新的SplitStream,本质上还是一个数据流,只不过是将流中的数据打上了不同的标签。
下面的例子是将一个DataStream中的对应的多个数字打上“even”偶数和“odd”奇数的标签,调用split方法,传入的拆分逻辑是一个匿名内部类,new OutputSelector<Integer>(),需要指定一个泛型,即输入和拆分后的数据都是Integer类型。该匿名内部类要重写一个select方法,在select方法中实现拆分逻辑,这里使用if判断,如果是偶数,将经even字符串添加到ArrayList中;如果是奇数,将经odd字符串添加到ArrayList中,本质上是将这个字符串标签和对应的数据一一关联,返回一个SplitedStream,以后可以根据标签的名称在挑选出来想要的数据流(调用select方法)。返回的SplitedStream已经标记为过时,1.12版本已经删除了split和select方法,可以使用侧流输出代替。
DataStreamSource<Integer> numbers = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //将数据打上标签,拆分成奇数和偶数 @Override public Iterable<String> select(Integer value) { List<String> out = new ArrayList<String>(); if (value % 2 == 0) { out.add(“even”); //将该条数据打上even的标签 } else { out.add(“odd”); //将该条数据打上odd的标签 } return out; //返回带有标签的集合 } }); |
提示:select方法中的判断逻辑不仅仅只能有两中类型的标签,也可以打上更多的标签。
2 select 选择(SplitStream → DataStream)
该方法是将一个关联上多个标签的SplitStream中的数据流根据一个或多个标签名称进行筛选,返回一个新的DataStream。select方法传入的是一个字符串类型的可变参数,即可以选取一个标签名称的数据流,也可以选取多个标签名称的数据流。
//挑选出偶数的数据流 DataStream<Integer> even = splited.select(“even”); //挑选出奇数的数据流 DataStream<Integer> odd = splited.select(“odd”); //挑选出奇数、偶数全部的数据流 DataStream<Integer> all = splited.select(“even”,”odd”); |
3 project 投影(DataStream → DataStream)
该方法只能是DataStream中的数据为Tuple类型是才可以使用该方法,project方法的功能和map方法类似,project 方法必须传入数字类型的可变参数,选择出Tuple对应下标的一到多个数据返回一个新的DataStream,该方法只有Java的API有,Scala的API没此方法。
下面的例子是将Tuple3中的下标为0,2对应的数据,即名称和年龄选取出来返回一个新的DataStream。
//使用fromElements生成数据,数据为Tuple3类型,分别代表姓名、性别、年龄 DataStreamSource<Tuple3<String, String, Integer>> users = env.fromElements( Tuple3.of(“佩奇”, “女”, 5), Tuple3.of(“乔治”, “男”, 3) ); //投影方法只可以用于类型为Tuple的DataStream,返回的数据只要姓名和年龄 DataStream<Tuple> result = users.project(0, 2); |
4 iterate迭代(DataStream → IterativeStream → DataStream)
DataStreamSource<String> strs = env.socketTextStream(“localhost”, 8888);DataStream<Long> numbers = strs.map(Long::parseLong); //对Nums进行迭代 IterativeStream<Long> iteration = numbers.iterate(); DataStream<Long> iterationBody = iteration.map(new MapFunction<Long, Long>() { @Override public Long map(Long value) throws Exception { System.out.println(“iterate input =>” + value); return value-=1; } }); //只要满足value > 0的条件,就会形成一个回路,重新的迭代 DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){ @Override public boolean filter(Long value) throws Exception { return value <= 0; } }); |
星哥原创,未经许可,不得转载