Flink实战教程共分三篇。本篇为第一篇,主要讲解Flink的入门、安装配置、基本术语&概念、核心原理。第篇看这里,第篇看这里

仅需一次订阅,作者所有专栏都能看

推荐【Kafkahttps://bigbird.blog.csdn.net/article/details/108770504
推荐【Flinkhttps://blog.csdn.net/hellozpc/article/details/109413465
推荐【SpringBoothttps://blog.csdn.net/hellozpc/article/details/107095951
推荐【SpringCloudhttps://blog.csdn.net/hellozpc/article/details/83692496
推荐【Mybatishttps://blog.csdn.net/hellozpc/article/details/80878563
推荐【SnowFlakehttps://blog.csdn.net/hellozpc/article/details/108248227
推荐【并发限流https://blog.csdn.net/hellozpc/article/details/107582771


文章目录

Flink概述

按照Apache官方的介绍,Flink是一个对有界和无界数据流进行状态计算的分布式处理引擎和框架。通俗地讲,Flink就是一个流计算框架,主要用来处理流式数据。其起源于2010年德国研究基金会资助的科研项目“Stratosphere”,2014年3月成为Apache孵化项目,12月即成为Apache顶级项目。Flinken在德语里是敏捷的意思,意指快速精巧。其代码主要是由 Java 实现,部分代码由 Scala实现。在Flink的世界里,一切数据都是流式的:离线数据(批数据)是有界(bounded)的流;实时数据(流数据)是无界(unbounded)的流。Flink既可以处理有界的批量数据集,也可以处理无界的实时流数据,为批处理和流处理提供了统一编程模型。如果把Storm看做第一代流处理框架、Spark Streaming(微批处理)看做第二代,那么Flink称得上是第三代流处理框架,并且是集大成者。

Flink安装部署

和几乎所有的大数据处理框架一样,使用Flink之前,我们需要在服务器上安装部署Flink框架。Flink可以以多种模式运行,既可以单机运行,也可以集群运行。集群环境下既可以独立运行,也可以依赖YARN来运行。下面详细介绍各种安装部署模式。

本地模式

本地模式即在linux服务器直接解压flink二进制包就可以使用,不用修改任何参数,用于一些简单测试场景。

下载安装包

直接在Flink官网下载安装包,如写作此文章时最新版为flink-1.11.1-bin-scala_2.11.tgz

上传并解压至linux

1
2
3
4
5
6
[root@vm1 myapp]# pwd
/usr/local/myapp

[root@vm1 myapp]# ll
总用量 435772
-rw-r--r--. 1 root root 255546057 2月 8 02:29 flink-1.11.1-bin-scala_2.11.tgz

解压到指定目录

1
[root@vm1 myapp]# tar -zxvf flink-1.11.1-bin-scala_2.11.tgz  -C /usr/local/myapp/flink/

注意运行之前确保机器上已经安装了JDK1.8或以上版本,并配置了JAVA_HOME环境变量。JDK安装可以参考这篇博文

1
2
3
4
[root@vm1 ~]# java -version
java version "1.8.0_261"
Java(TM) SE Runtime Environment (build 1.8.0_261-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.261-b12, mixed mode)

进入flink目录执行启动命令

1
2
3
4
5
6
[root@vm1 ~]# cd /usr/local/myapp/flink/flink-1.11.1/
[root@vm1 flink-1.11.1]# bin/start-cluster.sh
[root@vm1 flink-1.11.1]# jps
3577 Jps
3242 StandaloneSessionClusterEntrypoint
3549 TaskManagerRunner

执行Jps查看java进程,可以看到Flink相关进程已经启动。可以通过浏览器访问Flink的Web界面http://vm1:8081

在这里插入图片描述

能在本机浏览器访问上述页面的前提是Windows系统的hosts文件配了vm1这台服务器的主机名和IP的映射关系,并且linux服务器的防火墙已关闭。

关闭防火墙

查看linux防火墙状态

1
[root@vm1 ~]# systemctl status firewalld

临时关闭防火墙

1
[root@vm1 ~]# systemctl stop firewalld

永久关闭防火墙

1
[root@vm1 ~]# systemctl disable firewalld

关闭Flink

执行bin/stop-cluster.sh

集群模式

集群环境适合在生产环境下面使用,且需要修改对应的配置参数。Flink提供了多种集群模式,我们这里主要介绍standalone和Flink on Yarn两种模式。

Standalone模式

Standalone是Flink的独立集群部署模式,不依赖任何其它第三方软件或库。如果想搭建一套独立的Flink集群,不依赖其它组件可以使用这种模式。搭建一个标准的Flink集群,需要准备3台Linux机器。

Linux机器规划
节点类型 主机名 IP
Master vm1 192.168.174.136
Slave vm2 192.168.174.137
Slave vm3 192.168.174.138

在Flink集群中,Master节点上会运行JobManager(StandaloneSessionClusterEntrypoint)进程,Slave节点上会运行TaskManager(TaskManagerRunner)进程。

集群中Linux节点都要配置JAVA_HOME,并且节点之间需要设置ssh免密码登录,至少保证Master节点可以免密码登录到其他两个Slave节点,linux防火墙也需关闭。

设置免密登录

1)先在每一台机器设置本机免密登录自身

1
2
[root@vm1 ~]# ssh-keygen -t rsa
[root@vm1 ~]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

在本机执行ssh登录自身,不提示输入密码则表明配置成功

1
2
[root@vm1 ~]# ssh vm1
Last login: Tue Sep 29 22:23:39 2020 from vm1

在其它机器vm2、vm3执行同样的操作:

ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

ssh vm2

ssh vm3

2)设置vm1免密登录其它机器

把vm1的公钥文件拷贝到其它机器vm2、vm3上

1
2
[root@vm1 ~]# scp ~/.ssh/id_rsa.pub root@vm2:~/
[root@vm1 ~]# scp ~/.ssh/id_rsa.pub root@vm3:~/

登录到vm2、vm3,把vm1的公钥文件追加到自己的授权文件中

1
2
[root@vm2 ~]# cat ~/id_rsa.pub  >> ~/.ssh/authorized_keys 
[root@vm3 ~]# cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

如果提示没有 ~/.ssh/authorized_keys目录则可以在这台机器上执行ssh-keygen -t rsa。不建议手动创建.ssh目录!

验证在vm1上ssh登录vm2、vm3是否无需密码,不需要密码则配置成功!

1
2
3
4
5
[root@vm1 ~]# ssh vm2
Last login: Mon Sep 28 22:31:22 2020 from 192.168.174.133

[root@vm1 ~]# ssh vm3
Last login: Tue Sep 29 22:35:25 2020 from vm1

执行exit退回到本机

1
2
3
4
[root@vm3 ~]# exit
logout
Connection to vm3 closed.
[root@vm1 ~]#

3)同样方式设置其它机器之间的免密登录

在vm2、vm3上执行同样的步骤

把vm2的公钥文件拷贝到vm1、vm3

1
2
3
4
[root@vm2 ~]# scp ~/.ssh/id_rsa.pub root@vm1:~/
[root@vm2 ~]# scp ~/.ssh/id_rsa.pub root@vm3:~/
[root@vm1 ~]# cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
[root@vm3 ~]# cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

把vm3的公钥文件拷贝到vm1、vm2

1
2
3
4
[root@vm3 ~]# scp ~/.ssh/id_rsa.pub root@vm1:~/
[root@vm3 ~]# scp ~/.ssh/id_rsa.pub root@vm2:~/
[root@vm1 ~]# cat ~/id_rsa.pub >> ~/.ssh/authorized_keys
[root@vm2 ~]# cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

4)验证ssh免密码登录

1
2
3
4
[root@vm2 ~]# ssh vm1
[root@vm2 ~]# ssh vm3
[root@vm3 ~]# ssh vm1
[root@vm3 ~]# ssh vm2
设置主机时间同步

如果集群内节点时间相差太大的话,会导致集群服务异常,所以需要保证集群内各节点时间一致。

执行命令yum install -y ntpdate安装ntpdate

执行命令ntpdate -u ntp.sjtu.edu.cn 同步时间

Flink安装步骤

下列步骤都是先在Master机器上操作,再拷贝到其它机器(确保每台机器都安装了jdk)

  1. 解压Flink安装包
1
[root@vm1 myapp]# tar -zxvf flink-1.11.1-bin-scala_2.11.tgz -C /usr/local/myapp/flink/
  1. 修改Flink的配置文件flink-1.11.1/conf/flink-conf.yaml

把jobmanager.rpc.address配置的参数值改为vm1

1
jobmanager.rpc.address: vm1
  1. 修改Flink的配置文件flink-1.11.1/conf/workers
1
2
3
[root@vm1 conf]# vim workers 
vm2
vm3
  1. 将vm1这台机器上修改后的flink-1.11.1目录复制到其他两个Slave节点
1
2
scp -rq /usr/local/myapp/flink vm2:/usr/local/myapp/
scp -rq /usr/local/myapp/flink vm3:/usr/local/myapp/
  1. 在vm1这台机器上启动Flink集群服务

执行这一步时确保各个服务器防火墙已关闭

进入flink目录/flink-1.11.1/bin执行start-cluster.sh

1
2
3
4
5
6
[root@vm1 ~]# cd /usr/local/myapp/flink/flink-1.11.1/
[root@vm1 flink-1.11.1]# bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vm1.
Starting taskexecutor daemon on host vm2.
Starting taskexecutor daemon on host vm3.
  1. 查看vm1、vm2和vm3这3个节点上的进程信息
1
2
3
4
5
6
7
8
9
10
11
[root@vm1 flink-1.11.1]# jps
4983 StandaloneSessionClusterEntrypoint
5048 Jps

[root@vm2 ~]# jps
4122 TaskManagerRunner
4175 Jps

[root@vm3 ~]# jps
4101 Jps
4059 TaskManagerRunner
  1. 查看Flink Web UI界面,访问http://vm1:8081
    在这里插入图片描述

8)提交任务执行

[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

提交任务可以在任意一台flink客户端服务器提交,本例中在vm1、vm2、vm3都可以
在这里插入图片描述

  1. 停止flink集群
1
bin/stop-cluster.sh

10)单独启动、停止进程

手工启动、停止主进程StandaloneSessionClusterEntrypoint

1
2
[root@vm1 flink-1.11.1]# bin/jobmanager.sh start
[root@vm1 flink-1.11.1]# bin/jobmanager.sh stop

手工启动、停止TaskManagerRunner(常用于向集群中添加新的slave节点)

1
2
[root@vm1 flink-1.11.1]# bin/taskmanager.sh start
[root@vm1 flink-1.11.1]# bin/taskmanager.sh stop

在容器化部署盛行的时代,Flink on Yarn应运而生。Flink on Yarn模式使用YARN 作为任务调度系统,即在YARN上启动运行flink。好处是能够充分利用集群资源,提高服务器的利用率。这种模式的前提是要有一个Hadoop集群,并且只需公用一套hadoop集群就可以执行MapReduce和Spark以及Flink任务,非常方便。因此需要先搭建一个hadoop集群。

Hadoop集群搭建

1)下载并解压到指定目录

官网下载Hadoop二进制包,上传到linux服务器,并解压到指定目录。

1
[root@vm1 ~]# tar -zxvf hadoop-2.9.2.tar.gz -C /usr/local/myapp/hadoop/

2)配置环境变量

vim /etc/profile

1
2
export HADOOP_HOME=/usr/local/myapp/hadoop/hadoop-2.9.2/
export PATH=$PATH:$HADOOP_HOME/bin

执行hadoop version查看版本号

1
2
3
[root@vm1 hadoop]# source /etc/profile
[root@vm1 hadoop]# hadoop version
Hadoop 2.9.2

3)修改hadoop-env.sh文件

修改配置export JAVA_HOME=${JAVA_HOME}指定JAVA_HOME路径:

1
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_261/

同时指定Hadoop日志路径,先创建好目录:

1
[root@vm1]# mkdir -p /data/hadoop_repo/logs/hadoop

再配置HADOOP_LOG_DIR

1
export HADOOP_LOG_DIR=/data/hadoop_repo/logs/hadoop

4)修改yarn-env.sh文件

指定JAVA_HOME路径

1
export JAVA_HOME=/usr/local/myapp/jdk/jdk1.8.0_261/

指定YARN日志目录:

1
2
[root@vm1 ~]# mkdir -p /data/hadoop_repo/logs/yarn
export YARN_LOG_DIR=/data/hadoop_repo/logs/yarn

4)修改core-site.xml

配置NameNode的地址fs.defaultFS、Hadoop临时目录hadoop.tmp.dir

NameNode和DataNode的数据文件都会存在临时目录下的对应子目录下

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://vm1:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/data/hadoop_repo</value>
</property>
</configuration>

6)修改hdfs-site.xml

dfs.namenode.secondary.http-address指定secondaryNameNode的http地址,本例设置vm2机器为SecondaryNameNode

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>vm2:50090</value>
</property>
</configuration>

7)修改yarn-site.xml

yarn.resourcemanager.hostname指定resourcemanager的服务器地址,本例设置vm1机器为hadoop主节点

1
2
3
4
5
6
7
8
9
10
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>vm1</value>
</property>
</configuration>

8)修改mapred-site.xml

[root@vm1 hadoop]# mv mapred-site.xml.template mapred-site.xml

1
2
3
4
5
6
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

mapreduce.framework.name设置使用yarn运行mapreduce程序

9) 配置slaves

设置vm2、vm3为Hadoop副节点

[root@vm1 hadoop]# vim slaves

1
2
vm2
vm3

10)设置免密码登录

免密配置参考前文 设置服务器间相互免密登录

11)拷贝hadoop到其它机器

将在vm1上配置好的Hadoop目录拷贝到其它服务器

1
2
[root@vm1 hadoop]# scp -r /usr/local/myapp/hadoop/ vm2:/usr/local/myapp/
[root@vm1 hadoop]# scp -r /usr/local/myapp/hadoop/ vm3:/usr/local/myapp/

12)格式化HDFS

在Hadoop集群主节点vm1上执行格式化命令

1
2
3
[root@vm1 bin]# pwd
/usr/local/myapp/hadoop/hadoop-2.9.2/bin
[root@vm1 bin]# hdfs namenode -format

如果要重新格式化NameNode,则需要先将原来NameNode和DataNode下的文件全部删除,否则报错。NameNode和DataNode所在目录在core-site.xmlhadoop.tmp.dirdfs.namenode.name.dirdfs.datanode.data.dir属性配置

13)启动集群

直接启动全部进程

1
[root@vm1 hadoop-2.9.2]# sbin/start-all.sh

也可以单独启动HDFS

1
sbin/start-dfs.sh

也可以单独启动YARN

1
sbin/start-yarn.sh

14)查看web页面

要在本地机器http访问虚拟机先关闭linux防火墙,关闭linux防火墙请参照前文

查看HDFS Web页面:

http://vm1:50070/

查看YARN Web 页面:

http://vm1:8088/cluster

15)查看各个节点进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@vm1 ~]# jps
5026 ResourceManager
5918 Jps
5503 NameNode

[root@vm2 ~]# jps
52512 NodeManager
52824 Jps
52377 DataNode
52441 SecondaryNameNode

[root@vm3 ~]# jps
52307 DataNode
52380 NodeManager
52655 Jps

16)停止Hadoop集群

1
[root@vm1 hadoop-2.9.2]# sbin/stop-all.sh

Hadoop集群搭建完成后就可以在Yarn上运行Flink了!

第1种:在YARN中预先初始化一个Flink集群,占用YARN中固定的资源。该Flink集群常驻YARN 中,所有的Flink任务都提交到这里。这种方式的缺点在于不管有没有Flink任务执行,Flink集群都会独占系统资源,除非手动停止。如果YARN中给Flink集群分配的资源耗尽,只能等待YARN中的一个作业执行完成释放资源才能正常提交下一个Flink作业。这种方式适合小规模、短时间计算任务。

第2种:每次提交Flink任务时单独向YARN申请资源,即每次都在YARN上创建一个新的Flink集群,任务执行完成后Flink集群终止,不再占用机器资源。这样不同的Flink任务之间相互独立互不影响。这种方式能够使得资源利用最大化,适合长时间、大规模计算任务。

下面分别介绍2种方式的具体步骤。

第1种方式

不管是哪种方式,都要先运行Hadoop集群

1)启动Hadoop集群

1
[root@vm1 hadoop-2.9.2]# sbin/start-all.sh

2)将flink依赖的hadoop相关jar包拷贝到flink目录

1
2
[root@vm1]# cp /usr/local/myapp/hadoop/hadoop-2.9.2/share/hadoop/yarn/hadoop-yarn-api-2.9.2.jar /usr/local/myapp/flink/flink-1.11.1/lib
[root@vm1]# cp /usr/local/myapp/hadoop/hadoop-2.9.2/share/hadoop/yarn/sources/hadoop-yarn-api-2.9.2-sources.jar /usr/local/myapp/flink/flink-1.11.1/lib

还需要 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ,可以从maven仓库下载并放到flink的lib目录下。

3)创建并启动flink集群

在flink的安装目录下执行

1
bin/yarn-session.sh -n 2 -jm 512 -tm 512 -d

这种方式创建的是一个一直运行的flink集群,也称为flink yarn-session

由于Yarn模式的Flink集群是由yarn来启动的,因此可以在yarn控制台,即hadoop集群管理页面查看是否有flink任务成功运行:http://vm1:8088/cluster
在这里插入图片描述

创建成功后,flink控制台会输出web页面的访问地址,可以在web页面查看flink任务执行情况:
在这里插入图片描述

控制台输出http://vm2:43243 可以认为flink的Jobmanager进程就运行在vm2上,且端口是43243。指定host、port提交flink任务时可以使用这个地址+端口

4)附着到flink集群

创建flink集群后会有对应的applicationId,因此执行flink任务时也可以附着到已存在的、正在运行的flink集群

1
2
#附着到指定flink集群
[root@vm1 flink-1.11.1]# bin/yarn-session.sh -id application_1602852161124_0001

applicationId参数是上一步创建flink集群时对应的applicationId

5) 提交flink任务

可以运行flink自带的wordcount样例:

1
[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

在flink web页面 http://vm2:43243/ 可以看到运行记录:
在这里插入图片描述

可以通过-input和-output来手动指定输入数据目录和输出数据目录:

-input hdfs://vm1:9000/words
-output hdfs://vm1:9000/wordcount-result.txt

第2种方式

这种方式很简单,就是在提交flink任务时同时创建flink集群

1
[root@vm1 flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024 ./examples/batch/WordCount.jar

需要在执行上述命令的机器(即flink客户端)上配置环境变量YARN_CONF_DIR、HADOOP_CONF_DIR或者HADOOP_HOME环境变量,Flink会通过这个环境变量来读取YARN和HDFS的配置信息。

如果报下列错,则需要禁用hadoop虚拟内存检查:

1
2
Diagnostics from YARN: Application application_1602852161124_0004 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1602852161124_0004_000001 exited with  exitCode: -103
Failing this attempt.Diagnostics: [2020-10-16 23:35:56.735]Container [pid=6890,containerID=container_1602852161124_0004_01_000001] is running beyond virtual memory limits. Current usage: 105.8 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container.

修改所有hadoop机器(所有 nodemanager)的文件$HADOOP_HOME/etc/hadoop/yarn-site.xml

1
2
3
4
<property>  
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

重启hadoop集群再次运行

1
2
3
[root@vm1 hadoop-2.9.2]# sbin/stop-all.sh
[root@vm1 hadoop-2.9.2]# sbin/start-all.sh
[root@vm1 flink-1.11.1]# bin/flink run -m yarn-cluster -yjm 1024 ./examples/batch/WordCount.jar

任务成功执行,控制台输出如下。可以使用控制台输出的web页面地址vm3:44429查看任务。不过这种模式下任务执行完成后Flink集群即终止,所以输入地址vm3:44429时可能看不到结果,因为此时任务可能执行完了,flink集群终止,页面也访问不了了。
在这里插入图片描述

上述Flink On Yarn的2种方式案例中分别使用了两个命令:yarn-session.sh 和 flink run

yarn-session.sh 可以用来在Yarn上创建并启动一个flink集群,可以通过如下命令查看常用参数:

1
[root@vm1 flink-1.11.1]# bin/yarn-session.sh -h

-n :表示分配的容器数量,即TaskManager的数量

-s : 每个TaskManager的slot数量,一般根据cpu核数设定

-jm:设置jobManagerMemory,即JobManager的内存,单位MB

-tm:设置taskManagerMemory ,即TaskManager的内存,单位MB

-d: 设置运行模式为detached,即后台独立运行

-nm:设置在YARN上运行的应用的name(名字)

-id: 指定任务在YARN集群上的applicationId ,附着到后台独立运行的yarn session中

flink run命令既可以提交任务到Flink集群中执行,也可以在提交任务时创建一个新的flink集群,可以通过如下命令查看常用参数:

1
[root@vm1 flink-1.11.1]# bin/flink run -h

-m: 指定主节点(JobManger)的地址,在此命令中指定的JobManger地址优先于配置文件中的

-c: 指定jar包的入口类,此参数在jar 包名称之前

-p:指定任务并行度,同样覆盖配置文件中的值

flink run使用举例:

1)提交并执行flink任务,默认查找当前YARN集群中已有的yarn-session的JobManager

1
[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar -input hdfs://vm1:9000/hello.txt -output hdfs://vm1:9000/result_hello

2)提交flink任务时显式指定JobManager的的host的port,该域名和端口是创建flink集群时控制台输出的

1
[root@vm1 flink-1.11.1]# bin/flink run -m vm3:39921 ./examples/batch/WordCount.jar  -input hdfs://vm1:9000/hello.txt -output hdfs://vm1:9000/result_hello

3)在YARN中启动一个新的Flink集群,并提交任务

1
[root@vm1 flink-1.11.1]# bin/flink run  -m yarn-cluster  -yjm 1024 ./examples/batch/WordCount.jar -input hdfs://vm1:9000/hello.txt -output hdfs://vm1:9000/result_hello

-m yarn-cluster是固定写法,这种方式告诉flink不用去找standalone集群和yarn session集群,而是根据当前提交的job单独启动一个cluster。

Flink高可用(HA)

默认情况下,flink集群中只有一个主节点(master),即只有一个JobManager(StandaloneSessionClusterEntrypoint)进程,如果主节点的JobManager进程挂了则不能提交任务,且运行中的任务也会失败。使用高可用配置(HA,high availability),flink集群便可以从JobManager故障中恢复,避免单点故障(single point of failure,SPOF)。对应于flink的两种集群模式,有两种不同的HA配置方式。

Standalone集群HA

受虚拟机个数限制,本例中我们使用2个flink master节点(跑StandaloneSessionClusterEntrypoint进程),1个flink salve节点(跑TaskManagerRunner进程)。Flink HA还依赖zookeeper作为分布式协调工具、hdfs作为存储工具,因此需要先安装zk集群和hadoop集群。本例中zk集群、hadoop集群、flink集群都部署在相同的机器上,但是它们是相互独立的。hadoop集群采用前文中安装好的。zk集群安装参考这篇文章

集群节点进程规划:

StandaloneSessionClusterEntrypoint节点:vm1、vm2

TaskManagerRunner节点:vm3

zk节点:vm1、vm2、vm3

hadoop节点:vm1、vm2、vm3

各个进程信息

StandaloneSessionClusterEntrypoint:flink主节点进程,即JobManager进程

TaskManagerRunner:flink worker节点进程,即TaskManager进程

NameNode: HDFS管理者节点

DataNode:HDFS工作者节点

SecondaryNameNode:HDFS NameNode的备份节点

ResourceManager:hadoop yarn master节点

NodeManager:hadoop yarn work节点

QuorumPeerMain:zk进程

在开始flink HA安装配置前,确保各个机器上都设置了主机名,/etc/hosts文件配置了主机映射关系。各个主机间互相免密登录、防火墙已关闭、时间已同步。JDK已安装,且环境变量已经配置生效。Hadoop集群、zookeeper集群已搭建。

配置Flink,先在第一台机器配置,然后在复制到集群中的其它机器中。具体步骤如下:

  1. 解压Flink安装包
1
[root@vm1 myapp]# tar -zxvf flink-1.11.1-bin-scala_2.11.tgz  -C /usr/local/myapp/flink/
  1. 修改conf/flink-conf.yaml文件
1
2
3
4
5
jobmanager.rpc.address: vm1

high-availability: zookeeper
high-availability.storageDir: hdfs://vm1:9000/flink/ha/
high-availability.zookeeper.quorum: vm1:2181,vm2:2181,vm3:2181
  1. 修改conf/workers文件
1
vm3
  1. 修改conf/masters文件
1
2
vm1:8081
vm2:8081
  1. 配置好的flink目录复制到其它节点

本例配置HA时相对之前的配置只是改了workers、masters、flink-conf.yaml文件,所以只要拷贝这3个文件

1
2
3
4
5
6
[root@vm1 flink-1.11.1]# scp conf/flink-conf.yaml vm2:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 flink-1.11.1]# scp conf/flink-conf.yaml vm3:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 flink-1.11.1]# scp conf/workers vm2:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 flink-1.11.1]# scp conf/workers vm3:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 flink-1.11.1]# scp conf/masters vm2:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 flink-1.11.1]# scp conf/masters vm3:/usr/local/myapp/flink/flink-1.11.1/conf/
  1. 启动Hadoop集群

如果Hadoop还没启动,则启动hadoop集群

1
2
[root@vm1 flink-1.11.1]# cd /usr/local/myapp/hadoop/hadoop-2.9.2/
[root@vm1 hadoop-2.9.2]# sbin/start-all.sh
  1. 启动zk集群

如果zookeeper还没启动,则启动zookeeper集群

在每台机器的zk安装目录下执行:

1
2
3
4
5
6
7
[root@vm1 zookeeper-3.6.1]# zkServer.sh start
[root@vm2 zookeeper-3.6.1]# zkServer.sh start
[root@vm3 zookeeper-3.6.1]# zkServer.sh start
#过几秒钟查看状态
[root@vm1 zookeeper-3.6.1]# zkServer.sh status
[root@vm2 zookeeper-3.6.1]# zkServer.sh status
[root@vm3 zookeeper-3.6.1]# zkServer.sh status
  1. 启动Flink Standalone HA集群

在主节点执行启动脚本,并查看各个节点的进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[root@vm1 ~]# cd /usr/local/myapp/flink/flink-1.11.1/
[root@vm1 flink-1.11.1]# bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host vm1.
Starting standalonesession daemon on host vm2.
Starting taskexecutor daemon on host vm3.

[root@vm1 flink-1.11.1]# jps
9440 StandaloneSessionClusterEntrypoint
4707 NameNode
9508 Jps
4997 ResourceManager
8873 QuorumPeerMain

[root@vm2 ~]# jps
7056 QuorumPeerMain
3762 SecondaryNameNode
7476 StandaloneSessionClusterEntrypoint
3678 DataNode
3839 NodeManager
7583 Jps

[root@vm3 zookeeper-3.6.1]# cd ~
[root@vm3 ~]# jps
4580 DataNode
8421 QuorumPeerMain
4670 NodeManager
8830 TaskManagerRunner
8927 Jps
  1. 查看web页面

StandaloneSessionClusterEntrypoint(JobManager)节点都会启动Web服务,因此可以登录主节点的web页面验证

http://vm1:8081/

http://vm2:8081/

10)提交任务执行

无论在哪一台机器上提交任务,都可以被正确执行

1
2
3
[root@vm1 flink-1.11.1]#  bin/flink run ./examples/batch/WordCount.jar 
[root@vm2 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
[root@vm3 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
  1. 验证主进程HA切换

把vm1机器的StandaloneSessionClusterEntrypoint进程kill掉时,查看能否在vm1上提交任务

1
2
[root@vm1 flink-1.11.1]# kill 9440
[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

通过vm2的web页面查看到任务运行成功,说明vm1上的主进程挂掉之后flink集群依然能够工作

在这里插入图片描述

再次kill掉vm2上的StandaloneSessionClusterEntrypoint进程

此时提交任务便执行失败

1
2
3
4
5
6
7
8
9
[root@vm2 flink-1.11.1]# kill 7476

[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
Executing WordCount example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
  1. 重启vm1上的主进程

重启vm1上的StandaloneSessionClusterEntrypoint进程,看能否恢复

1
2
3
4
[root@vm1 flink-1.11.1]# bin/jobmanager.sh start
[root@vm1 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
[root@vm2 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar
[root@vm3 flink-1.11.1]# bin/flink run ./examples/batch/WordCount.jar

由此可见,重启vm1上的flink主进程后,3台flink客户端都可以正常提交任务,恢复正常!

  1. 重启vm2上的主进程
1
2
[root@vm2 flink-1.11.1]#  bin/jobmanager.sh start
Starting standalonesession daemon on host vm2.

访问vm2的web页面,发现刚才运行的flink任务已经同步显示,flink集群全部恢复正常了!vm1、vm2的web页面内容一致。
在这里插入图片描述

  1. 停止flink集群
1
2
3
4
[root@vm1 flink-1.11.1]#  bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 8830) on host vm3.
Stopping standalonesession daemon (pid: 9440) on host vm1.
Stopping standalonesession daemon (pid: 7476) on host vm2.

Flink on Yarn模式的HA利用的是YARN的任务恢复机制。Flink on Yarn模式依赖hadoop集群,这里可以使用前文中的hadoop集群。这种模式下的HA虽然依赖YARN的任务恢复机制,但是Flink任务在恢复时,需要依赖检查点产生的快照。快照虽然存储在HDFS上,但是其元数据保存在zk中,所以也需要一个zk集群,使用前文配置好的zk集群即可。

配置步骤如下:

  1. 修改Hadoop配置文件

    设置应用程序提交的最大尝试次数

[root@vm1 hadoop]# vim yarn-site.xml

1
2
3
4
5
6
7
<property>  
<name>yarn.resourcemanager.am.max-attempts</name>
<value>3</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>

拷贝到其它机器:

1
2
[root@vm1 hadoop]# scp yarn-site.xml vm2:/usr/local/myapp/hadoop/hadoop-2.9.2/etc/hadoop/
[root@vm1 hadoop]# scp yarn-site.xml vm3:/usr/local/myapp/hadoop/hadoop-2.9.2/etc/hadoop/
  1. 修改Flink配置文件

[root@vm1 conf]# vim flink-conf.yaml

1
2
3
4
5
high-availability: zookeeper
high-availability.storageDir: hdfs://vm1:9000/flink/ha-yarn
high-availability.zookeeper.quorum: vm1:2181,vm2:2181,vm3:2181
high-availability.zookeeper.path.root:/flink-yarn
yarn.application-attempts:10

拷贝到其它机器:

1
2
[root@vm1 conf]# scp flink-conf.yaml vm2:/usr/local/myapp/flink/flink-1.11.1/conf/
[root@vm1 conf]# scp flink-conf.yaml vm3:/usr/local/myapp/flink/flink-1.11.1/conf/
  1. 启动Hadoop集群

[root@vm1 hadoop-2.9.2]# sbin/start-all.sh

  1. 启动zk集群
1
2
3
4
[root@vm1 ~]# zkServer.sh start
[root@vm2 ~]# zkServer.sh start
[root@vm3 ~]# zkServer.sh start
zkServer.sh status 查看节点状态

5)启动Flink on Yarn集群

1
2
3
4
5
6
[root@vm1 flink-1.11.1]# bin/yarn-session.sh -n 2 -d
2020-10-19 17:49:13,530 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl [] - Default schema
2020-10-19 17:49:13,557 ERROR org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState [] - Authentication failed
2020-10-19 17:49:13,588 INFO org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn [] - Session establishment complete on server vm2/192.168.174.137:2181, sessionid = 0x20001b4148c0001, negotiated timeout = 40000
2020-10-19 17:49:13,602 INFO org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager [] - State change: CONNECTED
JobManager Web Interface: http://vm3:38392

查看flink在hadoop中的任务信息:

http://vm1:8088/cluster
在这里插入图片描述

说明Flink的主节点进程JobManager(YarnSessionClusterEntrypoint)在vm3上,想要验证HA,只要将vm3上的Flink主节点进程JobManager(YarnSessionClusterEntrypoint)kill掉。

验证HA之前,先看下Flink任务能否正常提交运行。

1
2
3
4
5
6
7
8
9
10
11
12
[root@vm1 flink-1.11.1]#  bin/flink run ./examples/batch/WordCount.jar 
#运行上述命令,可以发现flink任务运行成功

[root@vm3 flink-1.11.1]# jps
11538 Jps
10228 NodeManager
10149 DataNode
10453 QuorumPeerMain
10902 YarnSessionClusterEntrypoint

#kill掉vm3上的flink主进程
[root@vm3 flink-1.11.1]# kill 10902

kill掉vm3的YarnSessionClusterEntrypoint之后,等一段时间,发现vm2上自动启动了YarnSessionClusterEntrypoint进程:

1
2
3
4
5
6
7
[root@vm2 flink-1.11.1]# jps
11520 QuorumPeerMain
13104 Jps
12530 YarnSessionClusterEntrypoint
11206 SecondaryNameNode
11288 NodeManager
11133 DataNode

同时Hadoop管理页面也能看到有新的flink任务出现:
在这里插入图片描述

此时再在vm1上提交任务

1
[root@vm1 flink-1.11.1]#  bin/flink run ./examples/batch/WordCount.jar 

运行成功!表明HA切换成功。

6)停止flink

启动flink时控制台有提示:

1
2
3
Available commands:
help - show these commands
stop - stop the YARN session

输入stop即可!至此Flink的两种集群方式下的HA配置讲解完毕!动手试试吧。

快速入门案例

本节以大数据处理领域的“hello world”案例即词频统计,带领初学者对基于Flink的Java应用开发窥一般而知全貌。

引入pom依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.9.2</version>
</dependency>

批处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

/**
* 使用flink实现批处理:统计一个文件中的各个单词出现的频次,并且把结果存储到文件中
*/
public class BatchWordCount {
public static void main(String[] args) throws Exception {
String inputPath = "F:\\data\\file";
String outPath = "F:\\data\\result";

//获取flink批处理的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取文件中的内容
DataSource<String> text = env.readTextFile(inputPath);

//对文件进行处理(分词、分组、计数)
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
counts.writeAsCsv(outPath, "\n", " ", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("batch wordCount0");
}

public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
//匹配非单词字符分割单词,并返回2元组集合
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}

file文件

1
2
3
hello, my name is daniao,hello hello!
bigbird is a famous person,famous!
mi-xi-mi-xi

result文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
daniao 1
name 1
pig 2
big 2
famous 2
hello 5
bigbird 1
is 2
my 1
person 1
a 1
bird 1
mi 2
pird 1
xi 2

注意到代码中使用了扁平化的Map,即FlatMap。因为文本文件存在多行,如果使用Map,则是一行映射一个处理结果。使用FlatMap则是输出所有行整体的统计处理结果。读者可以对比使用Map的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class BatchWordCountWithMap {
public static void main(String[] args) throws Exception {
String inputPath = "F:\\data\\file";
String outPath = "F:\\data\\result";

//获取flink批处理的运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取文件中的内容
DataSource<String> text = env.readTextFile(inputPath);

//对文件进行处理
MapOperator<String, Map<String, Integer>> counts = text.map(new Tokenizer());
counts.writeAsText(outPath, FileSystem.WriteMode.OVERWRITE).setParallelism(1);
env.execute("batch wordCount1");
}

public static class Tokenizer implements MapFunction<String, Map<String, Integer>> {
@Override
public Map<String, Integer> map(String value) throws Exception {
Map<String, Integer> wordCountMap = new HashMap<>();
//以空白字符作为分割,包括空格、换行、tab
String[] tokens = value.toLowerCase().split("\\W+");
for (String word : tokens) {
if (wordCountMap.get(word) == null) {
wordCountMap.put(word, 1);
} else {
wordCountMap.put(word, wordCountMap.get(word) + 1);
}
}
return wordCountMap;
}
}
}

输出结果:

1
2
3
4
{hello=1, pig=2, pird=1}
{big=2, xi=2, bird=1, hello=1, mi=2}
{name=1, daniao=1, is=1, hello=3, my=1}
{bigbird=1, a=1, famous=2, person=1, is=1}

流式处理

流式数据往往是连续不断的数据,我们可以在linux服务器上用netcat 模拟socket服务器,让flink接受来自网络的流数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* 使用flink实现流处理:基于socket数据源的词频统计
*/
public class StreamWordCount1 {
public static void main(String[] args) throws Exception {
//模拟socket输入源的服务器hostname(笔者使用的是虚拟机),能通过hostname访问的前提是程序运行时所在机器hosts文件做了域名/ip映射
int port = 9999;
String hostname = "vm1";
String delimiter = "\n";
//获取flink流式数据处理运行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> data = executionEnvironment.socketTextStream(hostname, port, delimiter);
//进行数据处理:文本行解析、分组、求和
DataStream<Tuple2<String, Integer>> dataStream = data
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.sum(1);
//设置sink操作并行度为1
dataStream.print().setParallelism(1);
//启动计算任务
executionEnvironment.execute("word count Stream");
}

//FlatMapFunction<String, WordWithCount>第一个参数是输入类型,第二个参数是输出类型
private static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String data, Collector<Tuple2<String, Integer>> out) throws Exception {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()) + " received " + data);
String[] splits = data.split("\\s");
for (String word : splits) {
//输出一个2元组
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}

运行上述Java程序之前,我们先在虚拟机linux服务器vm1上运行 nc -lk 9999开启监听,等待输入

1
[root@vm1 ~]# nc -lk 9999

运行Java程序,启动成功后在linux服务器输入数据

1
2
[root@vm1 ~]# nc -lk 9999
12 33 22 22 12 aa bb aa

此时Java程序控制台输出

1
2
3
4
5
6
7
8
9
2020-10-31 15:12:36:497 received 12 33 22 22 12 aa bb aa
(12,1)
(33,1)
(22,1)
(22,2)
(12,2)
(aa,1)
(bb,1)
(aa,2)

若再次输入数据,则程序还会累加统计

1
2
3
4
5
6
7
8
9
10
[root@vm1 ~]# nc -lk 9999
12 33 22 22 12 aa bb aa
22
aa aa
1234
2020-10-31 15:12:49:017 received 22
(22,3)
2020-10-31 15:13:00:603 received aa aa
(aa,3)
(aa,4)

官方提供了一个解析命令行参数的工具类ParameterTool,我们可以参考官方的demo,运行代码时添加启动参数即可
在这里插入图片描述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
/**
* 使用flink实现流处理:基于socket数据源的词频统计
* 使用Flink自带的参数解析工具、自定义实体类
*/
public class StreamWordCount2 {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String delimiter = "\n";
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount " +
"--hostname <hostname> --port <port>', where hostname (localhost by default) " +
"and port is the address of the text server");
System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
"type the input text into the command line");
return;
}

//获取flink流式处理运行环境
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = executionEnvironment.socketTextStream(hostname, port, delimiter);
//进行数据处理,FlatMapFunction<String, WordWithCount>第一个参数是输入类型,第二个参数是输出类型
SingleOutputStreamOperator<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String data, Collector<WordWithCount> collector) throws Exception {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()) + " received " + data);
String[] splits = data.split("\\s");
for (String word : splits) {
collector.collect(new WordWithCount(word.trim(), 1L));
}
}
})
.keyBy("word")
//sum函数本质上是个reduce操作
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});

//把数据打印到控制台并设置并行度
windowCounts.print().setParallelism(1);
//执行计算
executionEnvironment.execute("Socket Window WordCount2");
}

/**
* 自定义输出数据类型的实体类
*/
public static class WordWithCount {
public String word;//单词
public Long count;//计数

public WordWithCount() {
}

public WordWithCount(String word, Long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return "WordWithCount{" +
"word='" + word + '\'' +
", count=" + count +
'}';
}
}
}

提交flink集群运行

上述案例中我们都是直接在本地运行的Java代码,实际生产环境可以打jar包丢到集群中去运行,具体步骤如下

1)打jar包

pom文件引入打包所需的插件,使用mvn package命令打包

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件,用于将scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

打包成功后,在工程的target目录下面生成一个jar包:flinkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar

2)界面方式提交

提交job之前先运行flink集群;测试服务器开启Linux网络端口监听:nc -lk 9999

在flink的管理控制台提交job时,选择本地打好的jar包上传即可。可以在web页面指定运行时参数、入口类、并行度等信息;还可以查看Flink作业的执行计划,点击submit便可以提交运行。
在这里插入图片描述

运行后还可以在页面上取消任务、查看任务及输出,非常方便管理flink作业
在这里插入图片描述

在linux 使用nc命令监听,并输入数据;flink收到数据计算并输出,在具体干活的机器taskManager上可以看到输出
在这里插入图片描述

3)命令行方式

提交

1
2
3
[root@vm1 flink-1.11.1]# bin/flink run -c bigbird.tech.flink.streaming.StreamWordCount2 -p 1 ~/flinkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar --hostname vm2 --port 9999

Job has been submitted with JobID 41a7db588d15973cc1dee9ba5a61fca6

查看任务

1
2
3
4
5
[root@vm1 flink-1.11.1]# bin/flink list
Waiting for response...
------------------ Running/Restarting Jobs -------------------
26.10.2020 19:09:04 : 41a7db588d15973cc1dee9ba5a61fca6 : Socket Window WordCount (RUNNING)
--------------------------------------------------------------

取消

ctrl+c命令只是退出flink客户端,并不能退出正在执行的flink作业;可以在web页面取消flink任务,也可以在命令行操作,取消时需要指定jobId

1
2
3
[root@vm1 flink-1.11.1]# bin/flink cancel 41a7db588d15973cc1dee9ba5a61fca6
Cancelling job 41a7db588d15973cc1dee9ba5a61fca6.
Cancelled job 41a7db588d15973cc1dee9ba5a61fca6.

查看所有任务

1
2
3
4
5
6
7
8
9
10
11
[root@vm1 flink-1.11.1]# bin/flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.
---------------------- Terminated Jobs -----------------------
26.10.2020 18:25:51 : 41f13f01de1df903a0bcb783bf65ccd1 : Socket Window WordCount (FAILED)
26.10.2020 18:37:05 : cf4b8772747a40d1aeb47e5d14c41347 : word count2 (CANCELED)
26.10.2020 18:37:42 : 64e1eff5932ddcbc10e7b592acf082f3 : Socket Window WordCount (FINISHED)
26.10.2020 18:39:13 : 52e7fc692082b3041dbdfff982d901ae : Socket Window WordCount (CANCELED)
26.10.2020 19:09:04 : 41a7db588d15973cc1dee9ba5a61fca6 : Socket Window WordCount (CANCELED)
--------------------------------------------------------------

停止集群

1
[root@vm1 flink-1.11.1]# bin/stop-cluster.sh

此时flink终止运行,web也访问不了。

Flink核心概念与原理讲解

本节着重介绍Flink的一些基础概念和运行原理。

Flink运行时架构

Flink遵循 Master-Slave 架构设计原则。Flink运行时架构由两种类型的进程组成:一个JobManager,以及多个TaskManager。运行JobManager进程的节点认为是Master节点,运行TaskManager进程的节点认为是Worker(Slave)节点。组件之间的通信都是借助于Akka框架,包括任务的状态以及 Checkpoint 触发等信息。下图展示了Flink运行时架构。
在这里插入图片描述

由图可见,应用程序通常是由Flink Clinet端提交到flink集群中执行。尽管Flink客户端并不是flink程序执行时的组成部分,但是客户端的作用是很重要的,它负责准备和发送数据流图到JobManager,此后,客户端便可以断开(detached mode)。比如我们在Linux上运行flink命令带参数-d时,便是以detached mode运行,此时ctrl+c强退只是退出客户端,并不会停止flink进程,而是后台运行。当然Clinet端也可以一直和flink服务端保持连接(attached mode),以接收程序运行报告。Flink Clinet端既可以从命令行启动,也可以是Java或者Scala客户端代码启动。

JobManager和Taskmanager可以以各种方式启动:作为 standalone集群直接在机器上启动,或者在容器中启动,或者由 YARN or Mesos等资源框架管理。TaskManagers连接到JobManagers,声明它们是可用的,可以被分配工作。

JobManager

Flink中控制应用程序执行的主进程,负责协调Flink应用程序的分布式执行。JobManager决定着何时开始调度下一个flink任务或者任务集合、对执行成功或者失败的任务作出响应、协调checkpoints、协调Flink应用程序从失败中恢复。

JobManager进程由3个不同的组件构成:

ResourceManager

ResourceManager负责Flink集群中资源的分配和重分配及资源供应。ResourceManager管理着TaskManager中的任务槽(task slots)。任务槽(slot)是flink中资源调度的基本单位。Flink为不同的环境和资源提供者(如YARN、Mesos、Kubernetes和Standalone部署)实现了多个Resourcemanager。在Standalone模式下,ResourceManager只能分发可用的TaskManager的插槽,不能自己启动新的TaskManager进程。容器环境下,当JobManager申请资源时,ResourceManager将有空闲slot的TaskManager分配给JobManager。如果ResourceManager中没有足够的slot,则可以向资源提供平台发起会话,以提供启动了TaskManager进程的容器。

Dispatcher

Dispatcher提供了REST接口来提交要执行的Flink应用程序,并为每个提交的作业启动一个新的JobMaster。它还运行Flink WebUI来提供关于Flink作业执行的信息。

JobMaster

JobMaster负责管理单个作业图的执行。多个作业可以在一个Flink集群中同时运行,每个作业都有自己的JobMaster。

JobMaster首先接收到要执行的应用程序,该应用程序包含:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有class、lib和其它资源的jar包。JobMaster把JobGraph转换成物理层面的数据流图(physical dataflow graph),也称为“执行图”( ExecutionGraph),执行图包含了所有可以并发执行的任务(task)。

JobManager向资源管理器(ResourceManager)申请必需的计算资源用于执行任务,也就是TaskManager上的槽(slot)。一旦获取到足够的资源,便将ExecutionGraph中的task分发到真正执行它的TaskManager上。在运行过程中,JobManager会负责所有需要中央协调的操作,比如checkpoints的协调。

TaskManagers

TaskManagers 也叫做workers,是Flink中的工作进程,负责执行数据流图中的task、缓冲和交换数据流。Flink集群至少有一个TaskManager运行,通常flink集群中会有多个TaskManager运行。TaskManager中资源调度的最小单元是任务槽(slot),每个TaskManager都含有一定数量的slots(槽)。槽的数量通常与每个TaskManager的可用CPU内核数呈正相关。slot的数量限制了TaskManager能够并发处理任务的数量,即Flink任务能配置的最大并行度由 TaskManager上可用的 slots数决定。

TaskManager启动时会向ResourceManager注册它的槽(slots)。收到ResourceManager的指令后,TaskManager会将一个或者多个slot分配给JobManager调用。JobManager就可以向slot分配任务来执行。执行过程中,一个TaskManager可以跟它运行同一应用程序的TaskManager交换数据。

Flink核心概念与处理流程

并行度(Parallelism)

一个算子(operator)的子任务(subtask)数,称之为该算子的并行度(parallelism)。Flink中一个Stream的并行度,可以认为是其包含的所有算子中的最大的那个并行度。适当提高并行度可以提高Job的执行效率。Parallelism有如下几种设置方式。

  1. 如果不设置的话将使用配置文件flink-conf.yaml中默认配置
1
2
3
[root@vm1 conf]# cat flink-conf.yaml | grep parallelism
# The parallelism used for programs that did not specify and other parallelism.
parallelism.default: 1
  1. 命令行提交任务时 -p 指定
1
[root@vm1 flink-1.11.1]# bin/flink run -p 4 ~/flinkdemo-1.0-SNAPSHOT-jar-with-dependencies.jar
  1. 代码中指定
1
2
3
executionEnvironment.setParallelism(n)//通过env指定全局的并行度

dataStream.print().setParallelism(1);//也可以在每个算子后面单独指定并行度

本地dev环境不使用配置文件默认配置,即我们在本地开发工具中运行flink应用程序时,如果没有设置算子的并行度,则flink默认设置并行度为当前机器的cpu核数。例如快速入门案例中,我们不设置并行度,即将代码dataStream.print().setParallelism(1);改成:dataStream.print();,则控制台输出时结果前便带了一个数字。每行结果前的数字可以认为是并行任务的任务号,笔者机器4核cpu,因此这个数字范围是1-4

1
2
[root@vm1 ~]# nc -lk 9999
hello word how are you fine thank you and you !

控制台输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
2020-10-31 15:34:08:087 received hello word how are you fine thank you and you !
1> (!,1)
3> (word,2)
2> (hello,2)
3> (how,2)
4> (and,2)
2> (are,2)
2> (thank,1)
3> (you,3)
3> (fine,1)
3> (you,4)
3> (you,5)
123456789101112

并行度设置的优先级:

Operator Level> Execution Environment Level>Client Level>System Level

即 算子级别 > env级别 > Client级别 > 系统默认级别

任务槽(Task Slots)

每个worker (TaskManager)都是一个JVM进程,可以启动单独的线程执行一个或多个子任务。为了控制TaskManager能接受多少任务,提出了任务槽的概念,每个TaskManager至少包含一个任务槽。

每个任务槽代表TaskManager的一个固定资源子集。例如,一个有三个slot的TaskManager,会将其管理内存的1/3分配给每个slot。对资源进行分槽意味着子任务不会与其他作业的子任务争夺内存资源,而是拥有特定数量的内存资源。注意,TaskManager中的slot只是对内存隔离,并没有CPU隔离。

通过调整slot的数量,用户可以定义子任务如何相互隔离。比如每个TaskManager有且仅有一个slot,则意味着每个任务组在单独的JVM中运行(例如,JVM可以在单独的容器中启动)。一个TaskManager拥有多个slot则意味着更多子任务共享同一个JVM,即可以执行多个子任务。相同JVM中的任务共享TCP连接(通过多路复用机制)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。TaskManager进程、Task Slot、任务线程关系图如下:
在这里插入图片描述

两者关系

Slot指的是TaskManager能够提供的并发执行能力,是静态的并行能力的概念。而Parallelism则是TaskManager 实际会使用的并发能力,是动态的并行能力的概念。动态占用的资源总数必须小于静态能够提供的资源数。

Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相同的Job。同一个任务的不同子任务一般占据不同的slot,前后发生的子任务可以共享slot。插槽共享的好处是:Flink集群需要的任务槽数与要执行的Job中使用的最高并行度相同即可,而不需要计算一个程序总共包含多少任务(每个任务具有不同的并行度)。这可以提高slot资源利用率,一个slot便可以hold住这个Job的整条处理链。Slot的数量推荐设置为机器的cpu核心数。Flink任务能配置的最大并行度由TaskManager上可用的Slot数决定。如下图中的Job中,source、map、keyBy、window、apply子任务配置的并行度为6,sink操作的并行度为1,因此运行此任务,flink集群至少有6个slot。如果一个taskmanager只能提供3个,那就需要2个taskmanager。图中也展示了子任务共享slot。

在这里插入图片描述

最佳实践

下面看几个官方的例子更加清晰地理解slot和parallelism之间的关系,以及实际应用中如何设置。

  1. 下图展示了一个有3个TaskManager、每个TaskManager设置3个slot的结构图。该配置下的集群启动后共有9个可用的Slot。
    在这里插入图片描述

2)Parallelism 指定了TaskManager实际使用的并发能力,如果没有手动指定并行度则使用flink-config.yaml中的默认值。如图所示,Flink配置文件默认设置的并行度为 1,如果使用默认值,此处只会使用有9 个Slot中的1个,其余8个处于空闲状态。适当地根据集群配置、任务大小设置应用程序的并行度,才能提高Flink Job的计算效率。
在这里插入图片描述

3)下图展示了3种设置全局并行度的方式:可以修改配置文件、可以在命令行指定,也可以在应用程序env中指定。若全局指定了并行度为2,则每个算子都将占用2个slot。如图所示,9个空闲slot中的2个被使用。
在这里插入图片描述

若此处设置并行度为9,则所有的资源被使用,每个slot都被分配任务:
在这里插入图片描述

4)除了可以在应用程序的env中全局指定并行度外,也可以通过.setParallelism(x)为每个算子单独指定并行度。下图展示了在应用程序中单独指定sink操作的并行度为1,其它操作的并行度都是9,多线程处理完后最终由一个线程输出结果。

在这里插入图片描述

在这里插入图片描述

5)小结。如果Flink应用程序设置的并行度超过了集群中TaskManager可用的Slot总数,则程序一直会等待资源调度,如果超过了一定的时间(该时间可配置)则会抛出异常。

1
Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources.

实际应用中如果某个算子的处理逻辑比较复杂,处理数据比较慢,则可以考虑给该算子单独增加并行度,单不要超过集群中总的slot数。在flink配置文件flink-conf.yaml 配置了taskmanager.numberOfTaskSlots参数指定slot数之后,在web管理页面也能看到集群总的slots数。
例如在连接Kafka集群时,通常建议Flink Source 侧设置的并行度不要超过Kafka Topic的分区数,Flink的一个并行度可以处理Kafka一个或多个分区的数据,并行度大于topic的分区数则造成并行度空闲,资源浪费,正如Kafka消费者线程数据一般不超过topic分区数一样。

Flink应用程序执行流程

Flink中所有对数据处理的操作称为Transformation Operator。对于整个Dataflow而言,其开始和结束分别对应着Source Operator和Sink Operator,中间计算处理过程对应着Transformation Operator。

Flink应用程序的构成:source->transformation->sink

source: 数据输入、负责读取数据源

transformation: 利用各种算子进行转换处理

sink: 数据输出、负责结果输出

一个典型的 Flink 流式应用程序代码结构和流程如下图所示:
在这里插入图片描述

在这里插入图片描述

Flink上的程序运行时,会被映射成逻辑数据流(dataflows),包含上述3个组成部分。每一个dataflow以一个或者多个sources开始、以一个或者多个sink结束。

Flink任务链

Flink采用了任务链优化技术,对于分布式执行的操作(operator),Flink将operator子任务链接到一起,由一个线程执行。说白了就是把符合某种条件的子任务合并在一起,构成一个大任务。每个执行链会在 TaskManager 上的一个独立线程中执行,因此将多个operator子任务链接到一起减少了线程数以及多线程间的切换和缓冲开销,避免了数据在算子之间传输序列化与反序列化开销,提高了总体吞吐量,同时减少了延迟,可以在特定条件下减少本地通信开销。试想如果一个任务的不同子任务分不到不同的slot执行,甚至跨TaskManager执行,其网络通信开销必然比在同一个线程中本地调用大。

那么什么样的子任务是可以合并、构成所谓的operator chain的呢,下面娓娓道来。

首先要明白flink中的数据传输形式。

在Flink中,算子之间的数据传输形式可以是one-to-one(forwarding)模式,也可以是redistributing模式,具体是哪一种形式,取决于算子的种类。

one-to-one模式:Flink Stream 维护分区及元素的顺序。比如source和map之间,map算子的子任务接收到的元素个数以及顺序和source算子的子任务产生的元素个数及顺序相同。map、filter、flatMap等算子都是one-to-one的映射关系。

redistributing模式:Stream 的分区会发生改变。每个算子的子任务依据所选择的transformation发送到不同的目标任务。比如keyBy操作根据hashCode重分区,broadcast、rebalance随机重分区。这类算子引发redistribute过程,flink中的redistribute过程类似hadoop和spark中的shuffle过程。

如果算子本身是one-to-one操作,但是后续操作并行度调整了,和前面的操作并行度不一致,也将引发redistribute过程。毕竟Flink是允许一个程序中不同的算子设置不同的并行度。

由此可见,为了满足任务链的要求,Flink Stream中的多个算子需要设置相同的并行度,并且是通过local forward方式进行连接。只有相同并行度的one-to-one操作才能链接在一起形成一个Task,原来的算子成为其subTask。并行度相同、并且是one-to-one操作,两个条件缺一不可。

下面结合一个案例,来介绍Flink是如何进行任务合并的。

如下图所示,从逻辑视图中可以看出有一个Flink Stream Job,包含source、flatMap、keyBy、sink算子。并行化视图中可以看到除了source算子并行度设为1之外的其它算子并行度都设置为2。source和flatMap并行度不一致,所以不能合并。flatMap到keyBy之间从一个one-to-one操作到一个重分区操作存在redistribute过程,所以也不能合并为执行链。只有keyBy->sink之间数据传输方式是forward直传(没有数据shuffle),且并行度相同都是2,因此满足合并任务链的条件,可以合并,占用一个slot即可。原始的4个步骤,设置并行度之后应该是7个任务,理论上会占用7个slot,优化合并之后只剩5个,可见TaskManager的slot利用率得以提升。

在这里插入图片描述

比如Wordcount的案例,我们在web页面submit new job时可以show plan查看执行计划:默认没有设置并行度时,所有算子并行度都是1,因此source和flatMap可以chain在一起,同样keyBy聚合和sink也可以合并。

在这里插入图片描述

如果我们手动设置并行度为2。那么由于source和FlatMap之间发生并行度调整,将需要Rebalance,无法合并。同理,keyBy和sink之间也是如此。整个任务拆分为4个部分。

在这里插入图片描述

改变任务链

有时单个任务业务复杂、计算量巨大,合并在一起更加耗时。而flink默认是开启了任务合并功能的,只要满足合并要求的都会自动合并。那么能不能定制此项功能呢?答案是肯定的。Operator chain的行为可以通过API在代码中指定。可以在DataStream的operator后面(如someStream.map(…))调用startNewChain()来指定从该operator开始一个新的chain(与前一个操作断链,不会被chain到前一个操作)。调用disableChaining()禁止该operator参与chaining(不会与前后的operator chain一起,前后都断开)。也可以通过evn设置全局的开关,比如调用StreamExecutionEnvironment.disableOperatorChaining()全局禁用chaining。

如果想打破slot共享机制,让某些任务独享slot则可以设置Slot group。在代码中具体的算子后调用slotSharingGroup即可,比如someStream.filter(…).slotSharingGroup(“a”)。该设置为filter操作及后续操作单独开启了一个名称为a的slot组,和前面的操作比如source就分开了。在同一个slotSharingGroup中的任务都可以共享slot。不同的slotSharingGroup中的任务一定分配到不同的slot中执行。因此如果使用了slotSharingGroup,则Flink Stream的并行度是每个slot共享组的最大并行度叠加之后的值。

本篇总结

介绍完Flink的安装配置、基本原理、入门案例之后,下面就要进入贴近实战开发的内容讲解了。掌握上述基本概念和运行原理有助于进行Flink调优。在实际项目中难免会碰到性能问题,熟悉原理,不管是代码层面调优还是配置运维调优都能游刃有余。