03.Flink编程快速上手

Flink编程入门

1.1 初始化Flink项目模板

1.1.1 准备工作

要求安装Maven 3.0.4 及以上版本和JDK 8

1.1.2 使用maven命令创建java项目模板

  • 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate  -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false
  • 或者在命令行中执行下面的命令,需要有网络
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0

1.1.3 使用maven命令创建scala项目模板

  • 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-scala -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false
  • 或者在命令行中执行下面的命令
curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0

1.1.4 将maven项目导入到IDEA或Eclipse

3.Flink编程快速上手"

3.Flink编程快速上手"

1.2 DataFlow编程模型

3.Flink编程快速上手"

Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink

  • Source主要负责数据的读取
  • Transformation主要负责对数据的转换操作
  • Sink负责最终计算好的结果数据输出。

1.3 Flink第一个入门程序

1.3.1 实时WordCount

从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用nc -l 8888启动一个socket用来发送数据

public class StreamingWordCount {

    public static void main(String[] args) throws Exception {
        //创建流式计算的ExecutionEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //调用Source,指定Socket地址和端口
        DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1]));
        //切分压平并将单词和一放入元组中
        DataStream<Tuple2<String, Integer>> words = lines.
                flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector)
                            throws Exception {
                        String[] words = line.split(” “);
                        for (String word : words) {
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                });
        //按照key分组并聚合
        DataStream<Tuple2<String, Integer>> result = words.keyBy(0).sum(1);
        //将结果打印到控制台
        result.print();
        //执行
        env.execute(“StreamingWordCount”);
    }
}

1.3.2 离线WordCount

从一个指定文件读取数据,然后统计相同单词出现的次数,文件中的数据被处理完后该程序停止

public class BatchWordCount {

    public static void main(String[] args) throws Exception {
        //创建离线的ExecutionEnvironment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //指定Source
        DataSet<String> lines = env.readTextFile(args[0]);
        //对数据切分压平
        DataSet<Tuple2<String, Integer>> result = lines
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Integer>> collector)
                            throws Exception {
                        //使用分隔符切分
                        String[] words = line.split(” “);
                        //循环遍历切分后的数组
                        for (String word : words) {
                            //将单词使用collector收集
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .groupBy(0) //分组
                .sum(1); //聚合
        //保存结果
        result.writeAsText(args[1]);
        //执行
        env.execute(“BatchWordCount”);
    }

1.3.3 运行程序

  • 本地运行
3.Flink编程快速上手"
  • 提交到集群运行
bin/flink run -m node-1.51doit.cn:8081 -p 4 -c cn._51doit.flink.StreamingWordCount /root/hello-flink-java-1.0.jar node-1.51doit.cn 8888

星哥原创,未经许可,不得转载

wx