1 Flink编程入门
1.1 初始化Flink项目模板
1.1.1 准备工作
要求安装Maven 3.0.4 及以上版本和JDK 8
1.1.2 使用maven命令创建java项目模板
- 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-java -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false |
- 或者在命令行中执行下面的命令,需要有网络
curl https://flink.apache.org/q/quickstart.sh | bash -s 1.12.0 |
1.1.3 使用maven命令创建scala项目模板
- 执行maven命令,如果maven本地仓库没有依赖的jar,需要有网络
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-scala -DarchetypeVersion=1.12.0 -DgroupId=cn._51doit.flink -DartifactId=flink-scala -Dversion=1.0 -Dpackage=cn._51doit.flink -DinteractiveMode=false |
- 或者在命令行中执行下面的命令
curl https://flink.apache.org/q/quickstart-scala.sh | bash -s 1.12.0 |
1.1.4 将maven项目导入到IDEA或Eclipse


1.2 DataFlow编程模型

Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink
- Source主要负责数据的读取
- Transformation主要负责对数据的转换操作
- Sink负责最终计算好的结果数据输出。
1.3 Flink第一个入门程序
1.3.1 实时WordCount
从一个Socket端口中实时的读取数据,然后实时统计相同单词出现的次数,该程序会一直运行,启动程序前先使用nc -l 8888启动一个socket用来发送数据
public class StreamingWordCount { public static void main(String[] args) throws Exception { //创建流式计算的ExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //调用Source,指定Socket地址和端口 DataStream<String> lines = env.socketTextStream(args[0], Integer.parseInt(args[1])); //切分压平并将单词和一放入元组中 DataStream<Tuple2<String, Integer>> words = lines. flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { String[] words = line.split(” “); for (String word : words) { collector.collect(Tuple2.of(word, 1)); } } }); //按照key分组并聚合 DataStream<Tuple2<String, Integer>> result = words.keyBy(0).sum(1); //将结果打印到控制台 result.print(); //执行 env.execute(“StreamingWordCount”); } } |
1.3.2 离线WordCount
从一个指定文件读取数据,然后统计相同单词出现的次数,文件中的数据被处理完后该程序停止
public class BatchWordCount { public static void main(String[] args) throws Exception { //创建离线的ExecutionEnvironment final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //指定Source DataSet<String> lines = env.readTextFile(args[0]); //对数据切分压平 DataSet<Tuple2<String, Integer>> result = lines .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception { //使用分隔符切分 String[] words = line.split(” “); //循环遍历切分后的数组 for (String word : words) { //将单词使用collector收集 collector.collect(Tuple2.of(word, 1)); } } }) .groupBy(0) //分组 .sum(1); //聚合 //保存结果 result.writeAsText(args[1]); //执行 env.execute(“BatchWordCount”); } } |
1.3.3 运行程序
- 本地运行

- 提交到集群运行
bin/flink run -m node-1.51doit.cn:8081 -p 4 -c cn._51doit.flink.StreamingWordCount /root/hello-flink-java-1.0.jar node-1.51doit.cn 8888 |
星哥原创,未经许可,不得转载