大数据计算框架 Flink 实战教程共分为3篇。涵盖 Flink 安装配置、基本原理、核心概念、 流处理 API 和批处理 API、存储及状态一致性、容错机制、实战案例以及面试题讲解等。

本篇为第3篇,主要讲解 Flink实战案例。第1篇看这里;第2篇看这里

仅需一次订阅,作者所有专栏都能看

推荐【Kafkahttps://bigbird.blog.csdn.net/article/details/108770504
推荐【Flinkhttps://blog.csdn.net/hellozpc/article/details/109413465
推荐【SpringBoothttps://blog.csdn.net/hellozpc/article/details/107095951
推荐【SpringCloudhttps://blog.csdn.net/hellozpc/article/details/83692496
推荐【Mybatishttps://blog.csdn.net/hellozpc/article/details/80878563
推荐【SnowFlakehttps://blog.csdn.net/hellozpc/article/details/108248227
推荐【并发限流https://blog.csdn.net/hellozpc/article/details/107582771

本篇在讲解Flink案例之前先补充了几个知识点。

文章目录

BroadCast State和BroadCast Stream

BroadCastStream是由BroadCastState(广播状态)组成的数据流。BroadcastStream通常是通过调用DataStream的 broadcast() 方法返回,该方法的入参是一个 MapStateDescriptor对象。BroadcastStream后面不支持算子操作,只能使用DataStream的connect方法去连接BroadcastStream,得到一个BroadcastConnectedStream。

BroadcastState是Flink支持的Operator State的一种。使用广播状态可以将输入流中的数据广播(broadcast)到下游的operator的每个并发Task中。由于这些数据能够被所有Task共享,因此BroadcastState经常用在一些需要共享数据的场景,比如配置分发,这样每个Task都可以读取到最新的配置文件数据。

使用Broadcast State的一般步骤是:先创建一个Keyed或Non-Keyed的Data Stream,再调用DataStream的 broadcast() 方法得到一个Broadcasted Stream,该方法传入的是一个MapStateDescriptor对象。最后将处理数据的Data Stream连接到Broadcasted Stream上得到一个BroadcastConnectedStream。这样就将Broadcast State广播到Data Stream下游operator的每个并发实例中。

若业务Data Stream是Keyed Stream,则连接到Broadcasted Stream后,添加的处理函数为KeyedBroadcastProcessFunction。

1
2
3
4
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}

泛型参数的含义如下:

KS:Keyed Stream的keyBy的元素类型

IN1:非Broadcast的Data Stream中的数据记录的类型

IN2:Broadcast Stream中的数据记录的类型

OUT:经过处理后返回数据的数据类型

若Data Stream是Non-Keyed Stream,则连接到Broadcasted Stream后,添加的处理函数为BroadcastProcessFunction。

1
2
3
4
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
}

上述泛型参数的含义与KeyedBroadcastProcessFunction中的相同,只是没有keyBy操作,所以就没有KS泛型参数。

下面一个完整的例子来阐述如何使用广播处理函数。

需求1):自定义一个数据源,构建一个广播流,定时读取配置文件。在处理流中应用最新的配置,如果业务数据流中的元素在配置文件中开启处理标记则处理,否则忽略。

编写自定义数据源读定时取配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class MyConfigSource implements SourceFunction<String> {
private boolean isRunning = true;
String confPath = "F:\\data\\conf.txt";

/**
* run方法里编写数据产生逻辑
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<String> ctx) throws Exception {
while (isRunning) {
List<String> lines = Files.readAllLines(Paths.get(confPath));
lines.forEach(ctx::collect);
Thread.sleep(30000);//每隔30秒更新一次配置
}
}

@Override
public void cancel() {
isRunning = false;
}
}

编写Flink应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class BroadCastDemo{
public static void main(String[] args) throws Exception {
int port = 9001;
String hostname = "192.168.174.136";
String delimiter = "\n";

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//定义MapStateDescriptor来保存要广播的数据
final MapStateDescriptor<String, String> CONFIG_DESCRIPTOR = new MapStateDescriptor<>(
"config",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
//创建广播流,向下游广播配置数据
BroadcastStream<String> broadcastStream = env.addSource(new MyConfigSource()).broadcast(CONFIG_DESCRIPTOR);
//创建业务数据流
//连接socket获取输入数据
DataStreamSource<String> dataStreamSource = env.socketTextStream(hostname, port, delimiter);
dataStreamSource.connect(broadcastStream).process(new BroadcastProcessFunction<String, String, Object>() {
@Override
public void processElement(String value, ReadOnlyContext ctx, Collector<Object> out) throws Exception {
//获取广播数据
ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
String flag = broadcastState.get(value);
//如果key对应的value为1,则表示该数据要被处理
if (flag != null && flag.equals("1")) {
out.collect(value + "被处理了");
}
}

@Override
public void processBroadcastElement(String value, Context ctx, Collector<Object> out) throws Exception {
System.out.println("update config:" + value);
String[] split = value.split("=");
String key = split[0];
String flag = split[1];
//接收广播数据,更新State
BroadcastState<String, String> broadcastState = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
broadcastState.put(key, flag);
}
}).print();

env.execute("broadcast");
}
}

配置文件conf.txt:

1
2
3
cat=1
dog=0
fish=1

linux控制台输入

1
2
3
4
[root@vm1 ~]# nc -lk 9001
cat
dog
fish

idea控制台输出

1
2
3
4
5
update config:cat=1
update config:dog=0
update config:fish=1
cat被处理了
fish被处理了

修改conf.txt

1
2
3
cat=1
dog=1
fish=1

linux控制台输入

1
dog

IDEA控制台输出

1
2
3
4
update config:cat=1
update config:dog=1
update config:fish=1
dog被处理了

需求2):从kafka topic中读取各个城市的网约车最高限速配置,并实时处理司机上报的里程数据,如果超速则告警。司机里程实时上报也使用kafka topic来接收。

kafka的详细教程参见博文 《kafka详细教程》https://blog.csdn.net/hellozpc/article/details/105680217

pom依赖引入kafka连接器

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.2</version>
</dependency>

Java 代码

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DriverUploadInfo {
public String driverId;
/**
* 当前时间戳(毫秒)
*/
public long timestamp;
/**
* 当前里程表记录的总里程数(米)
*/
public double totalMileage;
/**
* 城市编码
*/
public String cityCode;

@Override
public String toString() {
return "DriverUploadInfo{" +
"driverId='" + driverId + '\'' +
", timestamp=" + timestamp +
", totalMileage=" + totalMileage +
", cityCode='" + cityCode + '\'' +
'}';
}
}

Flink应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class BroadCastDemo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

//kafka配置参数
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.174.136:9092");
props.put("zookeeper.connect", "192.168.174.136:2181");
//props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

//定义MapStateDescriptor来保存要广播的数据
final MapStateDescriptor<String, Double> CONFIG_DESCRIPTOR = new MapStateDescriptor<>(
"speed_config",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO);

//配置数据流,读取配置
BroadcastStream<String> broadcastStream = env.addSource(new FlinkKafkaConsumer<>(
"city_speed_config", //kafka topic
new SimpleStringSchema(), // String 序列化
props)).broadcast(CONFIG_DESCRIPTOR);

//业务数据流,处理数据
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<>(
"driver_upload", //kafka topic
new SimpleStringSchema(), // String 序列化
props));

KeyedStream<DriverUploadInfo, String> driverStream = dataStreamSource.map(new MapFunction<String, DriverUploadInfo>() {
@Override
public DriverUploadInfo map(String value) throws Exception {
DriverUploadInfo driverUploadInfo = JSON.parseObject(value, DriverUploadInfo.class);
return driverUploadInfo;
}
}).keyBy(new KeySelector<DriverUploadInfo, String>() {
@Override
public String getKey(DriverUploadInfo value) throws Exception {
return value.driverId;
}
});

driverStream.connect(broadcastStream).process(new KeyedBroadcastProcessFunction<Object, DriverUploadInfo, String, Object>() {
//使用ValueState保存上一次司机状态信息
private transient ValueState<DriverUploadInfo> driverState;

@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<DriverUploadInfo> descriptor = new ValueStateDescriptor<>("driverInfoState", DriverUploadInfo.class);
driverState = getRuntimeContext().getState(descriptor);//注册状态
}

@Override
public void processElement(DriverUploadInfo driverUploadInfo, ReadOnlyContext ctx, Collector<Object> out) throws Exception {
System.out.println("received event:" + driverUploadInfo);
ReadOnlyBroadcastState<String, Double> broadcastState = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
Double speedLimit = broadcastState.get(driverUploadInfo.cityCode);

DriverUploadInfo previousInfo = driverState.value();
if (previousInfo != null && speedLimit != null) {
//计算平均车速,通过里程表数差除以时间戳之差求得这段时间内的平均速度
double distance = driverUploadInfo.totalMileage - previousInfo.totalMileage;
double interval = (driverUploadInfo.timestamp - previousInfo.timestamp) / 1000;
double speed = distance / interval;
System.out.println("current speedLimit:" + speedLimit + ",current speed:" + speed);
if (speed > speedLimit) {
out.collect(new Tuple2<String, Double>(driverUploadInfo.driverId + "已超速", speed));
}
}
//更新状态
driverState.update(driverUploadInfo);
}

@Override
public void processBroadcastElement(String value, Context ctx, Collector<Object> out) throws Exception {
System.out.println("update config:" + value);
List<CityConfig> cityConfigs = JSON.parseArray(value, CityConfig.class);
BroadcastState<String, Double> broadcastState = ctx.getBroadcastState(CONFIG_DESCRIPTOR);
cityConfigs.forEach(e -> {
try {
broadcastState.put(e.cityCode, e.speedLimit);
} catch (Exception ex) {
ex.printStackTrace();
}
});
}
}).print();

env.execute("Broadcast State demo");
}
}

在linux上启动Kafka集群测试。

创建配置topic,topic名称city_speed_config

1
kafka-topics.sh --create --topic city_speed_config --bootstrap-server 192.168.174.136:9092

创建业务数据topic接收司机上报数据,topic名称driver_upload

1
kafka-topics.sh --create --topic driver_upload --bootstrap-server 192.168.174.136:9092

查看主题列表

1
kafka-topics.sh --list --bootstrap-server 192.168.174.136:9092

运行Java程序,启动Flink作业。

使用Kafka生产者脚本发送消息到主题city_speed_config:

1
kafka-console-producer.sh --topic city_speed_config --bootstrap-server 192.168.174.136:9092

输入 [{"cityCode":"320100","speedLimit":"45"},{"cityCode":"320101","speedLimit":"55"}]

使用Kafka生产者脚本发送消息到主题driver_upload :

1
kafka-console-producer.sh --topic driver_upload --bootstrap-server 192.168.174.136:9092

一条一条输入下列数据,观察IDEA控制台的输出:

1
2
3
4
5
6
7
8
9
{"driverId":"D10001","cityCode":"320100","timestamp":1607347920499,"totalMileage":1000}
{"driverId":"D10001","cityCode":"320100","timestamp":1607347921501,"totalMileage":1040}
{"driverId":"D10001","cityCode":"320100","timestamp":1607347922503,"totalMileage":1090}
{"driverId":"D10001","cityCode":"320100","timestamp":1607347923503,"totalMileage":1120}
1234
{"driverId":"D10002","cityCode":"320101","timestamp":1607347920499,"totalMileage":1000}
{"driverId":"D10002","cityCode":"320101","timestamp":1607347921501,"totalMileage":1040}
{"driverId":"D10002","cityCode":"320101","timestamp":1607347922503,"totalMileage":1095}
{"driverId":"D10002","cityCode":"320101","timestamp":1607347923503,"totalMileage":1151}

IDEA控制台输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
update config:[{"cityCode":"320100","speedLimit":"45"},{"cityCode":"320101","speedLimit":"55"}]  
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347920499, totalMileage=1000.0, cityCode='320100'}
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347921501, totalMileage=1040.0, cityCode='320100'}
current speedLimit:45.0,current speed:40.0
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347922503, totalMileage=1090.0, cityCode='320100'}
current speedLimit:45.0,current speed:50.0
(D10001已超速,50.0)
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347923503, totalMileage=1120.0, cityCode='320100'}
current speedLimit:45.0,current speed:30.0
received event:DriverUploadInfo{driverId='D10002', timestamp=1607347920499, totalMileage=1000.0, cityCode='320101'}
received event:DriverUploadInfo{driverId='D10002', timestamp=1607347921501, totalMileage=1040.0, cityCode='320101'}
current speedLimit:55.0,current speed:40.0
received event:DriverUploadInfo{driverId='D10002', timestamp=1607347922503, totalMileage=1095.0, cityCode='320101'}
current speedLimit:55.0,current speed:55.0
received event:DriverUploadInfo{driverId='D10002', timestamp=1607347923503, totalMileage=1151.0, cityCode='320101'}
current speedLimit:55.0,current speed:56.0
(D10002已超速,56.0)

重新发送配置数据到主题city_speed_config:

[{“cityCode”:“320100”,“speedLimit”:“60”},{“cityCode”:“320101”,“speedLimit”:“40”}]

再次发送业务数据到主题driver_upload 验证配置修改是否生效:

{“driverId”:“D10001”,“cityCode”:“320100”,“timestamp”:1607347920499,“totalMileage”:1000}

{“driverId”:“D10001”,“cityCode”:“320100”,“timestamp”:1607347921501,“totalMileage”:1061}

{“driverId”:“D10001”,“cityCode”:“320101”,“timestamp”:1607347922501,“totalMileage”:1112}

IDEA控制台输出

1
2
3
4
5
6
7
8
9
update config:[{"cityCode":"320100","speedLimit":"60"},{"cityCode":"320101","speedLimit":"40"}]
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347920499, totalMileage=1000.0, cityCode='320100'}
current speedLimit:60.0,current speed:40.0
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347921501, totalMileage=1061.0, cityCode='320100'}
current speedLimit:60.0,current speed:61.0
(D10001已超速,61.0)
received event:DriverUploadInfo{driverId='D10001', timestamp=1607347922501, totalMileage=1112.0, cityCode='320101'}
current speedLimit:40.0,current speed:51.0
(D10001已超速,51.0)

可见配置修改已经生效!至此,广播流的案例完结。

RocksDB

RocksDB是使用C++编写的嵌入式kv存储引擎,其键值均允许使用二进制流。由Facebook基于levelDB开发, 提供向后兼容的levelDB API。RocksDB使用LSM存储引擎,这也是许多非关系型数据库比如Hbase的核心思想。RocksDB支持多种压缩算法,配置灵活,存储层可以直接使用内存,使用Flash,使用硬盘或者HDFS。

以RocksDB的Java驱动为例,来演示一下RocksDB的使用。

引入pom依赖

1
2
3
4
5
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>6.13.3</version>
</dependency>

Java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.rocksdb.Options;
import org.rocksdb.RocksDB;

public class RocksDemo {
// RocksDB是由C++编写,在Java中使用首先需要加载JNI Native库
static {
// a static method that loads the RocksDB C++ library.
RocksDB.loadLibrary();
}

public static void main(String[] args) throws Exception {
// the Options class contains a set of configurable DB options that determines the behaviour of the database.
// 创建配置
Options dbOpt = new Options();
// 当数据库不存在时自动创建(路径中的上层目录需要先创建好)
dbOpt.setCreateIfMissing(true);
// 指定RocksDB文件存储位置,默认是本地文件,存储在本地磁盘;也可以使用分布式文件系统比如HDFS
RocksDB rdb = RocksDB.open(dbOpt, "d:\\data\\rocksdb");

// RocksDB以字节流形式保存数据
byte[] key = "data".getBytes();
byte[] value = "hello rocks db".getBytes();

// 使用put方法写入数据
rdb.put(key, value);
System.out.println("数据写入到RocksDB");

// 使用get方法读取数据
System.out.println("从RocksDB读取数据,key = " + new String(key) + ",value=" + new String(rdb.get(key)));

// 删除数据
rdb.delete(key);

// 关闭资源
rdb.close();
dbOpt.close();
}
}

上述入门案例演示了RocksDB的存储和查询,那么在Flink中如何使用RocksDB作为状态后端存储呢?

引入pom依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.12</artifactId>
<version>1.11.2</version>
</dependency>

设置RocksDB状态后端

1
env.setStateBackend(new RocksDBStateBackend("file:///D://data//rocksdb_flink", true));

QueryableState

上述案例中,状态后端存储的状态数据我们是没法直接读取的,都是二进制流。Flink提供了一种获取运行时状态的机制,让我们可以实时获取到Job运行时的State。这就是可查询状态(Queryable State)服务,顾名思义就是允许用户从外部系统(如业务系统)中查询Flink作业内部的状态(State)。在一些需要给外部系统开放状态查询的场景可以使用此特性。

QueryableState组件

QueryableStateClient:客户端组件,可以独立于Flink集群运行,用于提交用户查询请求。

QueryableStateClientProxy:代理组件,运行在每个TaskManager上(即在Flink集群中),负责接收客户端的请求。客户端的查询请求通常带一个key,代理组件从JobManager处查得key所属的TaskManager,将客户端请求转发到该TaskManager上运行的QueryableStateServer中,并将用户查询的状态结果转发给客户端。

QueryableStateServer:状态查询组件,运行在每个TaskManager上,负责管理本地状态,处理客户端的查询。

开启Queryable State

引入pom

1
2
3
4
5
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-queryable-state-runtime_2.12</artifactId>
<version>1.11.2</version>
</dependency>

本地代码中开启Queryable State服务

1
2
3
4
5
Configuration config = new Configuration();
config.setInteger(ConfigOptions.key("rest.port").intType().defaultValue(8081), 8081);
//开启Queryable State服务
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

在Flink中使状态可查询

有2种方式设置状态可查询。

1)QueryableStateStream

在将数据流转换为KeyedStream后调用asQueryableState()方法返回一个QueryableStateStream数据流。

需求:自定义数据源,接收用户每秒上报1个随机数。每隔10秒统计1次每个用户这10秒内上传的最大数值输出。并开启状态可查询,使用客户端每秒查一次指定用户的状态并输出。

自定义数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class MyDataSource implements SourceFunction<Tuple2<String, Integer>> {
private boolean isRunning = true;

/**
* run方法里编写数据产生逻辑,假设有3个用户,他们每秒钟上传一个数字
*
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
SecureRandom secureRandom = new SecureRandom();
while (isRunning) {
int i1 = secureRandom.nextInt(1000);
int i2 = secureRandom.nextInt(1000);
int i3 = secureRandom.nextInt(1000);
Tuple2<String, Integer> hd = new Tuple2<>("恒大", i1);
System.out.println("source time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")) + ",value:" + hd.toString());

Tuple2<String, Integer> le = new Tuple2<>("刘二", i2);
System.out.println("source time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")) + ",value:" + le.toString());

Tuple2<String, Integer> zs = new Tuple2<>("张三", i3);
System.out.println("source time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")) + ",value:" + zs.toString());

ctx.collect(hd);
ctx.collect(le);
ctx.collect(zs);
Thread.sleep(1000);
}
}

@Override
public void cancel() {
isRunning = false;
}
}

flink应用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class QueryableState {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setInteger(ConfigOptions.key("rest.port").intType().defaultValue(8081), 8081);
//开启Queryable State服务
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.addSource(new MyDataSource());
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).timeWindow(Time.seconds(10))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void apply(String o, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
List<Tuple2<String, Integer>> items = new ArrayList<>();
Iterator<Tuple2<String, Integer>> iterator = input.iterator();
while (iterator.hasNext()) {
items.add(iterator.next());
}
Collections.sort(items, Comparator.comparing(e -> e.f1));
Tuple2<String, Integer> max = items.get(items.size() - 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println(("window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "],max:" + max));
out.collect(max);
}
});
result.print();

result.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).asQueryableState("MaxValueIn10s");//asQueryableState使得结果的状态可查
env.execute("queryableState demo");
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class QueryClient1 {
public static void main(String[] args) throws Exception {
//jobID是运行上述flink作业时控制台输出的
String jobID = "db6ba9ca3bcd15cf7f06da4122ea975d";
final JobID jobId = JobID.fromHexString(jobID);
System.out.println("jobId:" + jobId);
final String jobManagerHost = "127.0.0.1";
final int jobManagerPort = 9069;

QueryableStateClient client = new QueryableStateClient(jobManagerHost, jobManagerPort);

//用状态描述符来描述我们要查询的状态
ValueStateDescriptor<Tuple2<String, Integer>> stateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("maxvalue", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));

//查询指定用户的状态,key对应keyBy的元素类型
//比如这里每隔1秒查询Flink中恒大这个用户的状态
final String key = "恒大";

while (true) {
CompletableFuture<ValueState<Tuple2<String, Integer>>> completableFuture =
client.getKvState(
jobId,
"MaxValueIn10s",
key,
BasicTypeInfo.STRING_TYPE_INFO,
stateDescriptor);
System.out.println(completableFuture.get().value());
//每隔1秒查询一次
Thread.sleep(1000);
}
}
}

2)StateDescriptor

通过ValueStateDescriptor.setQueryable()方法开放此状态,使此状态可查询。还是以上面的需求举例。

改动点在于不使用KeyedStream的asQueryableState()方法,而是在状态描述符上直接设置状态可查询。

flink应用代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class QueryableState2 {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.setInteger(ConfigOptions.key("rest.port").intType().defaultValue(8081), 8081);
//开启Queryable State服务
config.setBoolean(QueryableStateOptions.ENABLE_QUERYABLE_STATE_PROXY_SERVER, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.addSource(new MyDataSource());
SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
}).timeWindow(Time.seconds(10))
.apply(new RichWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
// 定义一个ValueState,来存放状态
private transient ValueState<Tuple2<String, Integer>> maxValueState;

@Override
public void open(Configuration parameters) throws Exception {
//用状态描述符来描述我们要查询的状态
ValueStateDescriptor<Tuple2<String, Integer>> stateDescriptor = new ValueStateDescriptor<Tuple2<String, Integer>>("maxvalue", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
}));
//在状态描述符上直接设置状态可查询
stateDescriptor.setQueryable("MaxValueIn10s");
maxValueState = getRuntimeContext().getState(stateDescriptor);
}

@Override
public void apply(String o, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
List<Tuple2<String, Integer>> items = new ArrayList<>();
Iterator<Tuple2<String, Integer>> iterator = input.iterator();
while (iterator.hasNext()) {
items.add(iterator.next());
}
Collections.sort(items, Comparator.comparing(e -> e.f1));
Tuple2<String, Integer> max = items.get(items.size() - 1);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
System.out.println(("window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "],max:" + max));
//更新状态值
maxValueState.update(max);
out.collect(max);
}

});
result.print();

env.execute("queryableState demo");
}
}

客户端代码没有变化,只要改变jobId的值,依然能够查询到指定用户的实时state值。

实战案例

网页UV统计

web应用中将用户对每个页面的请求日志发送到Kafka中,Flink从Kafka中消费日志数据,并解析日志、统计每个页面当日独立用户访问量(UV),最后将统计结果输出到redis中供其它服务查询。

日志数据实体类

1
2
3
4
5
6
7
public class UserVisitEvent {
public String traceId;//日志id
public String date;//日志日期,如:20201221
public Integer pageId;//web页面id
public String uid;//用户id
public String url;//页面url
}

Flink应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
public class UVdemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//kafka配置参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.129:9092");
props.put("group.id", "web-uv-stat");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
FlinkKafkaConsumer011<String> kafkaConsumer011 = new FlinkKafkaConsumer011<>(
"uv_statistics", //kafka topic
new SimpleStringSchema(), // String 序列化
props);

FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig
.Builder().setHost("127.0.0.1").setPort(6379).build();

env.addSource(kafkaConsumer011)
.setParallelism(2)
.map(str -> {
//解析日志时间戳,转换为日期
Map<String, String> o = JSON.parseObject(str, Map.class);
String timestamp = o.get("timestamp");
Instant instant = Instant.ofEpochMilli(Long.parseLong(timestamp));
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
UserVisitEvent userVisitEvent = JSON.parseObject(str, UserVisitEvent.class);
userVisitEvent.date = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
return userVisitEvent;
})
.keyBy("date", "pageId")//按照日期和页面id分组,每个页面用独立的KeyedState来存储数据
.map(new RichMapFunction<UserVisitEvent, Tuple2<String, Long>>() {
// 存储userId集合,使用MapState存储key即可,value不使用
private MapState<String, String> userIdSet;
// MapState没有统计元素个数的方法,所以需要一个存储UV值的状态
private ValueState<Long> uvCount;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 从状态中恢复userIdSet状态
userIdSet = getRuntimeContext().getMapState(
new MapStateDescriptor<>("userIdSet",
TypeInformation.of(new TypeHint<String>() {
}),
TypeInformation.of(new TypeHint<String>() {
})));
// 从状态中恢复uvCount状态
uvCount = getRuntimeContext().getState(new ValueStateDescriptor<>("uvCount", TypeInformation.of(new TypeHint<Long>() {
})));
}

@Override
public Tuple2<String, Long> map(UserVisitEvent userVisitEvent) throws Exception {
// 设置uvCount初始值
if (null == uvCount.value()) {
uvCount.update(0L);
}
if (!userIdSet.contains(userVisitEvent.uid)) {
userIdSet.put(userVisitEvent.uid, null);
//用户首次访问才更新uvCount计数
uvCount.update(uvCount.value() + 1);
}
// 生成写入redis的数据,格式:key=date_pageId,value=count值
String redisKey = userVisitEvent.date + "_"
+ userVisitEvent.pageId;
System.out.println(redisKey + ":" + uvCount.value());
return Tuple2.of(redisKey, uvCount.value());
}
})
.addSink(new RedisSink<>(redisConf, new RedisSinkMapper()));

env.execute("UV Statistics");
}

private static class RedisSinkMapper implements RedisMapper<Tuple2<String, Long>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}

@Override
public String getKeyFromData(Tuple2<String, Long> data) {
return data.f0;
}

@Override
public String getValueFromData(Tuple2<String, Long> data) {
return data.f1.toString();
}
}
}

创建topic

1
[root@vm1 ~]# kafka-topics.sh --create --zookeeper 192.168.174.136:2181 --replication-factor 2 -partitions 2 --topic uv_statistics

运行Flink应用,使用kafka生产者脚本向topic发送数据。

1
2
3
4
5
6
7
8
9
10
11
kafka-console-producer.sh --topic uv_statistics --bootstrap-server 192.168.174.136:9092
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0001","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0001","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0002","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0001","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0002","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0002","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1001,"traceId":"tid-11","uid":"U0003","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1002,"traceId":"tid-11","uid":"U0003","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1002,"traceId":"tid-11","uid":"U0003","url":"/user/login"}
>{"timestamp":"1607695647643","pageId":1002,"traceId":"tid-11","uid":"U1442","url":"/user/login"}

可以看到flink应用程序输出正常

1
2
3
4
5
6
7
20201211_1001:1
20201211_1001:2
20201211_1001:2
20201211_1001:3
20201211_1002:1
20201211_1002:1
20201211_1002:2

同时redis里面也能查询到最新的统计数据。打开redis客户端,执行下列命令获取key的值

1
2
3
4
127.0.0.1:6379> get 20201211_1002
"2"
127.0.0.1:6379> get 20201211_1001
"3"

上传jar到flink集群中运行

上述程序是在本地运行的,也可以提交到Flink集群中运行。Flink应用程序打包方式以及如何提交到flink集群参见第一篇的快速入门案例。提交之前确保kafka和redis都已经启动,且能正常连接。
在这里插入图片描述

由此可见,在集群多并行度环境下,依然正常输出正确的结果!

远程提交Flink应用

只要像下面这样修改运行环境,运行主函数所在的类即可远程提交flink任务:

1
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.174.136", 8081, "F:\\flink\\flink_job_uv\\target\\flink_job_uv-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

提交到flink集群中运行后,本地idea关闭flink应用程序不会影响已经提交到flink集群中的job。即已提交到集群中运行的job不会被终止。注意,提交job时要把flink应用程序中用到的其它类所在的jar包带上。即flink应用程序所依赖的类库要打包成jar包传入,否则会报各种类找不到的异常:ClassNotFoundException

在实际生产中,可以先将flink应用程序工程打成jar。通过springboot应用在程序启动时提交flink任务。springboot应用程序也打成jar,这样就可以通过运行springboot应用启动flink计算任务。

springboot启动flink任务

pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<exclusions>
<!-- 排除自带的logback依赖 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.11.2</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.11.2</version>
<!-- <scope>provided</scope>-->
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.11.2</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.11.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>

<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
</dependencies>

<build>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- scala编译插件,用于将scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.4.0</version>
<configuration>
<scalaCompatVersion>2.11</scalaCompatVersion>
<scalaVersion>2.11.12</scalaVersion>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.bigbird.flink_job_uv.FlinkJobUvApplication</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- Springboot打包插件 -->
<!-- <plugin>-->
<!-- <groupId>org.springframework.boot</groupId>-->
<!-- <artifactId>spring-boot-maven-plugin</artifactId>-->
<!-- <version>2.2.6.RELEASE</version>-->
<!-- </plugin>-->
</plugins>
</build>
SpringBoot入口类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@SpringBootApplication
public class FlinkJobUvApplication implements ApplicationListener<ContextRefreshedEvent> {

public static void main(String[] args) {
SpringApplication.run(FlinkJobUvApplication.class, args);
}

@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
System.out.println("submit flink job...");
UVdemo.submitJob();
} catch (Exception e) {
e.printStackTrace();
}
}
}
Flink应用程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class UVdemo {
public static void submitJob() throws Exception {
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("192.168.174.136", 8081, "F:\\flink\\flink_job_uv\\flink_job_uv-0.0.1-SNAPSHOT-jar-with-dependencies.jar");

//kafka配置参数
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.174.136:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "web-uv-stat");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
FlinkKafkaConsumer<String> kafkaConsumer011 = new FlinkKafkaConsumer<>(
"uv_statistics", //kafka topic
new SimpleStringSchema(), // String 序列化
props);

FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig
.Builder().setHost("192.168.174.1").setPort(6379).build();

env.addSource(kafkaConsumer011)
.map(new MapFunction<String, UserVisitEvent>() {
@Override
public UserVisitEvent map(String str) throws Exception {
//解析日志时间戳,转换为日期
Map<String, String> o = JSON.parseObject(str, Map.class);
String timestamp = o.get("timestamp");
UserVisitEvent userVisitEvent = new UserVisitEvent();
if (timestamp != null) {
Instant instant = Instant.ofEpochMilli(Long.parseLong(timestamp));
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
userVisitEvent = JSON.parseObject(str, UserVisitEvent.class);
userVisitEvent.date = localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
} else {
System.out.println("timestamp不能为空");
}

return userVisitEvent;
}
})
.keyBy("date", "pageId")//按照日期和页面id分组,每个页面用独立的KeyedState来存储数据
.map(new RichMapFunction<UserVisitEvent, Tuple2<String, Long>>() {
// 存储userId集合,使用MapState存储key即可,value不使用
private MapState<String, String> userIdSet;
// MapState没有统计元素个数的方法,所以需要一个存储UV值的状态
private ValueState<Long> uvCount;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 从状态中恢复userIdSet状态
userIdSet = getRuntimeContext().getMapState(
new MapStateDescriptor<>("userIdSet",
TypeInformation.of(new TypeHint<String>() {
}),
TypeInformation.of(new TypeHint<String>() {
})));
// 从状态中恢复uvCount状态
uvCount = getRuntimeContext().getState(new ValueStateDescriptor<>("uvCount", TypeInformation.of(new TypeHint<Long>() {
})));
}

@Override
public Tuple2<String, Long> map(UserVisitEvent userVisitEvent) throws Exception {
// 设置uvCount初始值
if (null == uvCount.value()) {
uvCount.update(0L);
}
if (!userIdSet.contains(userVisitEvent.uid)) {
userIdSet.put(userVisitEvent.uid, null);
//用户首次访问才更新uvCount计数
uvCount.update(uvCount.value() + 1);
}
// 生成写入redis的数据,格式:key=date_pageId,value=count值
String redisKey = userVisitEvent.date + "_"
+ userVisitEvent.pageId;
System.out.println(redisKey + ":" + uvCount.value());
return Tuple2.of(redisKey, uvCount.value());
}
})
.addSink(new RedisSink<>(redisConf, new RedisSinkMapper()));

env.execute("UV Statistics_demo");
}

private static class RedisSinkMapper implements RedisMapper<Tuple2<String, Long>> {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}

@Override
public String getKeyFromData(Tuple2<String, Long> data) {
return data.f0;
}

@Override
public String getValueFromData(Tuple2<String, Long> data) {
return data.f1.toString();
}
}
}

注意,flink应用程序中传入的jar路径是Flink应用程序所有的依赖库打成的jar包。因此是先将flink应用程序及其依赖库打jar包,存放到指定位置。再将SpringBoot工程打成jar包。因此pom文件中有两种打包插件,分别用于打包flink应用程序和Springboot。

打包命令:mvn package -DskipTests

运行SpringBoo应用即可提交flink任务到集群中运行。

1
java -jar flink_job_uv-0.0.1-SNAPSHOT.jar

取消任务可以在Flink web页面(笔者的flink集群web地址是http://vm1:8081/ )或者linux命令行操作。

SpringBoot直接停止、取消flink任务的实现方式欢迎留言!

头条实时热榜统计

需求描述:每5分钟统计一次1小时内南京地区的热点新闻,输出topN条热门新闻。新闻点击阅读数据来自kafka消息队列(由业务系统或者ETL系统写入处理过的数据到kafka中)。

输入数据实体类

1
2
3
4
5
6
7
8
9
10
/**
* 新闻数据
*/
public class NewsInfo {
public String newsId;
public int categoryId;
public String city;
public String content;
public long timestamp;
}

计数数据实体类

1
2
3
4
5
6
7
8
/**
* 某个窗口内特定新闻的计数值
*/
public class NewsWindowCount {
public String newsId;
public long windowEnd;
public long count;
}

Flink应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
* 热榜统计:每5分钟统计1次1小时内南京地区的热点新闻,输出头条新闻
*/
public class HotListStatistics {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

//简单起见,数据也可从文本文件中读取;并从数据中提取时间戳生成水位线
//DataStreamSource<String> inputStreamSource = env.readTextFile("D:\\news.txt");
//从kafka接收输入数据
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.244.128:9092");
props.put("group.id", "news-group");//指定消费者组
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest"); //value 反序列化

DataStreamSource<String> inputStreamSource = env.addSource(new FlinkKafkaConsumer<>("topic_hotlist", new SimpleStringSchema(), props));

DataStream<NewsInfo> dataStream = inputStreamSource.map(line -> {
String[] split = line.split(",");
NewsInfo newsInfo = new NewsInfo();
newsInfo.newsId = split[0];
newsInfo.categoryId = Integer.parseInt(split[1]);
newsInfo.city = split[2];
newsInfo.content = split[3];
newsInfo.timestamp = Long.parseLong(split[4]);
return newsInfo;
})
//由于输入数据是单调递增的时间戳,可以直接使用Flink提供的水位线生成器MonotonousTimestamps,也可以使用固定延迟的水位线
.assignTimestampsAndWatermarks(WatermarkStrategy.<NewsInfo>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.timestamp));

//过滤南京地区的热榜;分组开窗聚合,得到每个窗口内各个新闻的count值
DataStream<NewsWindowCount> aggregateStream = dataStream.filter(e -> e.city.equals("南京"))
.keyBy(e -> e.newsId)//按照新闻id分组
.timeWindow(Time.hours(1), Time.minutes(5))//一小时的窗口,每隔5分滑动一次
.aggregate(new CountAggregation(), new ItemWindowCountFunction());

//将同一窗口内的新闻按计数值排序,统计topN热点新闻(使用状态编程、定时器)
DataStream<String> processRes = aggregateStream
.keyBy("windowEnd")
.process(new TopNHotListFunction(3));

processRes.print();

env.execute("hot list");
}

/**
* 自定义增量聚合函数,完成每个新闻热点的计数累加功能
*/
static class CountAggregation implements AggregateFunction<NewsInfo, Long, Long> {

@Override
public Long createAccumulator() {
//初始值为0
return 0L;
}

@Override
public Long add(NewsInfo value, Long accumulator) {
//每来一次相同的新闻点击数据,计数加一
return accumulator + 1;
}

@Override
public Long getResult(Long accumulator) {
return accumulator;
}

@Override
public Long merge(Long a, Long b) {
return a + b;
}
}

/**
* 自定义全量窗口聚合函数,输出封装好的ItemWindowCount
*/
static class ItemWindowCountFunction implements WindowFunction<Long, NewsWindowCount, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<Long> input, Collector<NewsWindowCount> out) throws Exception {
int size = 0;
Iterator<Long> iterator = input.iterator();
while (iterator.hasNext()) {
iterator.next();
size++;
}
System.out.println("size is:" + size);

NewsWindowCount newsWindowCount = new NewsWindowCount();
//这里input迭代器里只有一个元素
newsWindowCount.count = input.iterator().next();
newsWindowCount.newsId = key;
newsWindowCount.windowEnd = window.getEnd();
out.collect(newsWindowCount);
}
}

static class TopNHotListFunction extends KeyedProcessFunction<Tuple, NewsWindowCount, String> {
//输出topN的结果数据
private int topN;

//先定义状态变量,在生命周期方法里获取
private ListState<NewsWindowCount> newsWindowCountListState = null;

public TopNHotListFunction(int topN) {
this.topN = topN;
}

@Override
public void open(Configuration parameters) throws Exception {
newsWindowCountListState = getRuntimeContext().getListState(
new ListStateDescriptor<>("newsCountListState", NewsWindowCount.class));
}

@Override
public void processElement(NewsWindowCount value, Context ctx, Collector<String> out) throws Exception {
//每来一个数据直接加入状态ListState中
newsWindowCountListState.add(value);
//注册一个windowEnd+1毫秒触发的定时器
ctx.timerService().registerEventTimeTimer(value.windowEnd + 1);
}

/**
* 当所有所有的窗口统计结果都到达,触发定时器,输出排序
*
* @param timestamp
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
//将ListState中的数据放到可排序的数据结构里面
List<NewsWindowCount> newsWindowCounts = new ArrayList<>();
Iterator<NewsWindowCount> iterator = newsWindowCountListState.get().iterator();
while (iterator.hasNext()) {
newsWindowCounts.add(iterator.next());
}
//清除状态,节省内存
newsWindowCountListState.clear();
newsWindowCounts.sort(new Comparator<NewsWindowCount>() {
@Override
public int compare(NewsWindowCount o1, NewsWindowCount o2) {
if (o1.count == o2.count) {
return 0;
} else if (o1.count > o2.count) {
return -1;
} else {
return 1;
}
}
});

//将新闻排名格式化输出
StringBuffer outputStr = new StringBuffer();
outputStr.append("windowEndTime:").append(new Timestamp(timestamp - 1)).append("\r\n");
for (int i = 0; i < Math.min(newsWindowCounts.size(), topN); i++) {
NewsWindowCount newsWindowCount = newsWindowCounts.get(i);
outputStr.append("No.").append(i + 1)
.append(" 新闻ID:").append(newsWindowCount.newsId).append("\t")
.append(" 点击量:").append(newsWindowCount.count)
.append("\r\n");
}
out.collect(outputStr.toString());
}
}
}

准备数据news.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
100001,1,南京,鸟哥发财了,1608285818351,20201218 18:03:38:351
100002,1,南京,李晶发财了,1608285878351,20201218 18:04:38:351
100002,1,北京,李晶发财了,1608285900000,20201218 18:05:00:000
100003,1,南京,衡总发财了,1608285900002,20201218 18:05:00:002

100001,1,南京,鸟哥发财了,1608285938351,20201218 18:05:38:351
100005,1,南京,军哥发财了,1608286058351,20201218 18:07:38:351
100005,1,南京,军哥发财了,1608286178351,20201218 18:09:38:351
100005,1,南京,军哥发财了,1608286200000,20201218 18:10:00:000
100001,1,南京,鸟哥发财了,1608286200002,20201218 18:10:00:002
100003,1,南京,衡总发财了,1608286298351,20201218 18:11:38:351

100002,1,南京,李晶发财了,1608289200000,20201218 19:00:00:000
100002,1,南京,李晶发财了,1608289200002,20201218 19:00:00:002
100002,1,南京,李晶发财了,1608289500002,20201218 19:00:05:002

分别运行flink应用程序、kafka生产者工具,并发送第一条消息

1
2
3
kafka-topics.sh --create --zookeeper 192.168.174.136:2181 --replication-factor 2 -partitions 2 --topic topic_hotlist
kafka-console-producer.sh --topic topic_hotlist --bootstrap-server 192.168.174.136:9092
>100001,1,南京,鸟哥发财了,1608285818351

flink应用程序收到第一条数据时没有输出,直到收到第4条时间戳为1608285900002的数据才触发计算,此时数据的时间戳正好比window的endTime 18:05:00加1毫秒大,因此触发计算。但是输出的统计结果不含时间戳为1608285900000的这条边界数据,因此结果如下:

输入:

1
2
3
>100002,1,南京,李晶发财了,1608285878351
>100002,1,北京,李晶发财了,1608285900000
>100003,1,南京,衡总发财了,1608285900002

输出:

1
2
3
windowEndTime:2020-12-18 18:05:00.0
No.1 新闻ID:100001 点击量:1
No.2 新闻ID:100002 点击量:1

以此类推,时间戳为1608286200001的数据到达时,再次触发计算,累计统计前一小时的数据。

输入:

1
2
3
4
5
>100001,1,南京,鸟哥发财了,1608285938351
>100005,1,南京,军哥发财了,1608286058351
>100005,1,南京,军哥发财了,1608286178351
>100005,1,南京,军哥发财了,1608286200000
>100001,1,南京,鸟哥发财了,1608286200002

输出:

1
2
3
4
windowEndTime:2020-12-18 18:10:00.0
No.1 新闻ID:100005 点击量:2
No.2 新闻ID:100001 点击量:2
No.3 新闻ID:100002 点击量:1

当时间戳跳变到19:00:00:000时,之前的窗口全部触发计算。因此输出统计结果如下:

输入:

1
2
>100003,1,南京,衡总发财了,1608286298351
>100002,1,南京,李晶发财了,1608289200000

输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
windowEndTime:2020-12-18 18:15:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:20:00.0
No.1 新闻ID:100005 点击量:3
No.2 新闻ID:100001 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:25:00.0
No.1 新闻ID:100005 点击量:3
No.2 新闻ID:100001 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:30:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:35:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:40:00.0
No.1 新闻ID:100005 点击量:3
No.2 新闻ID:100001 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:45:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100002 点击量:1

windowEndTime:2020-12-18 18:50:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100003 点击量:1

windowEndTime:2020-12-18 18:55:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100003 点击量:1

输入>100002,1,南京,李晶发财了,1608289200002时,

由于时间戳对应19:00:00:002,触发了window endTime 为19:00:00.0的窗口:

1
2
3
4
windowEndTime:2020-12-18 19:00:00.0
No.1 新闻ID:100001 点击量:3
No.2 新闻ID:100005 点击量:3
No.3 新闻ID:100003 点击量:1

输入最后一条数据100002,1,南京,李晶发财了,1608289500002时,输出如下:

1
2
3
4
windowEndTime:2020-12-18 19:05:00.0
No.1 新闻ID:100005 点击量:3
No.2 新闻ID:100002 点击量:2
No.3 新闻ID:100001 点击量:2

19:05:00.0统计的前1小时的区间为[18:05:00.0,19:05:00.0)

订单超时检测

在介绍实际案例之前,先补充一下CEP的理论知识。

CEP简介

CEP是复杂事件处理(Complex Event Processing)的缩写。在Flink中,CEP用于在事件流中检测匹配事件模式,即对一个或多个简单事件构成的事件流进行一定规则地匹配,然后输出满足条件的复杂事件,也就是最终用户想要的结果。

Flink CEP提供了一组Pattern API,对输入流数据进行复杂事件规则定义,用来提取符合规则的事件序列。

Pattern API的使用步骤:

1)定义一个Pattern

2)将创建好的Pattern应用到输入事件流上

3)匹配出满足模式的事件序列,得到结果

Pattern API的分类:

1)个体模式(Individual Patterns)

组成复杂规则的单个独立的模式定义,比如:start.times(3).where(new SimpleCondition(){…})

个体模式又分 singleton(单例)模式和looping(循环)模式。单例模式只能接收一个事件而循环模式可以接收多个。

可以在个体模式后面追加量词,指定循环次数。

比如:

start.times(4) //匹配出现4次

start.times(2,4) //匹配出现2,3或者4次

每个模式都需要指定触发条件,作为事件模式匹配的依据。个体模式可以通过.where() .or()和.until()来指定条件

2)组合模式(Combining Patterns)

由一个个个体模式组合而成,进而形成一整个模式序列。组合模式必须以一个初始模式开始,比如:

Pattern<Event,Event> start=Pattern.begin(“start”)

3)模式组(Groups of patterns)

将一个模式序列作为条件嵌套在个体模式里构成一组模式

近邻模式

1)严格近邻(Strict Contigutity)

要求所有事件按照严格的顺序出现,中间没有任何不匹配的事件,使用.next()指定。例如对于模式“a next b”,事件序列

[a,c,b]并未匹配。

2)宽松近邻(Relaxed Contigutity)

允许中间出现不匹配的事件,由.followedBy()指定。例如对于模式“a followedBy b”,事件[a,c,b]匹配。

3)不确定性宽松近邻(Non-Deterministic Relaxed Contigutity)

进一步放宽条件,之前匹配过的条件还可以再次使用。由followedByAny()指定。例如对于模式“a followedByAny b”,事件序列

[a,c,b,b]匹配两次,分别为{a,b},{a,b}。

4)不允许某种近邻关系

.notNext() 不允许某个事件严格紧邻前一个事件发生

.notFollowedBy() 不想允许某个事件在两个事件之间发生

注意:

所有模式必须以.begin()开始

模式序列不能以.notFollowedBy()结束

not模式不允许使用optional修饰

可以为模式指定时间约束,指定多长时间内匹配有效,比如:

next.within(Time.seconds(5))

模式检测

1)一般模式检测

模式确定后,应用到输入流,用于模式匹配得到一个PatternStream

DataStream input=…

Pattern<Event,Event> pattern=Pattern.begin(“start”).where(…)…

PatternStream patternStream=CEP.pattern(input,pattern);

匹配事件的提取

在得到PatternStream之后,可以使用select或者fastselect方法从检测到的事件序列中提取事件。

select方法使用参数Map,List>来接收匹配到的事件序列,Map的key为模式的名称,value是所有匹配的事件的列表。

2)超时模式检测

select方法和fastselect方法可以指定超时处理程序,超时处理程序会接收到目前为止由模式匹配到的所有事件,可以指定一个OutputTag定义接收到的超时事件序列。

订单超时检测案例

1)导入pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
<properties>
<java.version>1.8</java.version>
<flink.version>1.11.1</flink.version>
<scala.version>2.12</scala.version>
<kafka.version>2.2.0</kafka.version>
</properties>
123456
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_${scala.version}</artifactId>
<version>${flink.version}</version>
</dependency>

2)定义订单事件实体和结果实体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class OrderEvent {
private String orderId;
private String eventType;
private String transactId;
private Long timeStamp;

public OrderEvent() {

}

public OrderEvent(String orderId, String eventType, String transactId, Long timeStamp) {
this.orderId = orderId;
this.eventType = eventType;
this.transactId = transactId;
this.timeStamp = timeStamp;
}
//省略getter/setter方法/toString方法
}
123456789101112131415161718
public class OrderRes {
private String orderId;
private String resultState;

public OrderRes() {
}

public OrderRes(String orderId, String resultState) {
this.orderId = orderId;
this.resultState = resultState;
}
//省略getter/setter方法/toString方法
}
12345678910111213

3)flink应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class OrderPayTimeOutDetection {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

//读取数据源,转换为实体类
URL resource = OrderPayTimeOutDetection.class.getResource("/orderLog.txt");
DataStreamSource<String> dataStreamSource = env.readTextFile(resource.getPath());
DataStream<OrderEvent> orderEventStream = dataStreamSource.map(line -> {
String[] split = line.split(",");
return new OrderEvent(split[0], split[1], split[2], new Long(split[3]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
@Override
public long extractAscendingTimestamp(OrderEvent orderEvent) {
return orderEvent.getTimeStamp() * 1000L;
}
});

//定义模式
Pattern<OrderEvent, OrderEvent> orderPayPattern = Pattern.<OrderEvent>begin("create").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.getEventType().equals("create");
}
})
//宽松近邻,只要下单之后又支付操作即可
.followedBy("pay").where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.getEventType().equals("pay");
}
})
//15分钟之类必须支付完成
.within(Time.minutes(15));

//使用侧输出流标签表示超时事件
OutputTag<OrderRes> orderTimeoutTag = new OutputTag<OrderRes>("order-timeout") {
};

//将Pattern应用到输入事件流上得到PatternStream
PatternStream<OrderEvent> patternStream = CEP.pattern(orderEventStream.keyBy(OrderEvent::getOrderId), orderPayPattern);

//使用select方法实现对匹配事件和超时事件的处理
SingleOutputStreamOperator<OrderRes> resStream = patternStream.select(orderTimeoutTag, new OrderTimeOutSelection(), new OrderPayedSelection());

resStream.print("payed ok");
resStream.getSideOutput(orderTimeoutTag).print("payed timeout");

env.execute("Order payTimeOut dDetection");
}

//超时事件处理
private static class OrderTimeOutSelection implements PatternTimeoutFunction<OrderEvent, OrderRes> {
@Override
public OrderRes timeout(Map<String, List<OrderEvent>> pattern, long timeoutTimestamp) throws Exception {
//由于是单例模式,可以直接取第一个元素
String orderId = pattern.get("create").get(0).getOrderId();
return new OrderRes(orderId, "timeout:" + timeoutTimestamp);
}
}

//正常事件匹配
private static class OrderPayedSelection implements PatternSelectFunction<OrderEvent, OrderRes> {
@Override
public OrderRes select(Map<String, List<OrderEvent>> pattern) throws Exception {
String orderId = pattern.get("pay").get(0).getOrderId();
return new OrderRes(orderId, "payed");
}
}
}

4)测试数据orderLog.txt

1
2
3
4
5
6
7
8
9
10
11
O9850001,create,,1610543460
O9850002,create,,1610543520
O9850003,create,,1610543580
O9850002,pay,3c586d2g8,1610543580
O9850004,create,,1610543760
O9850001,pay,6c18cs3e8,1610544358
O9850005,create,,1610544361
O9850004,pay,fc1gce2g8,1610543760
O9850003,pay,1c18us3e2,1610544580
O9850006,create,,1610544600
O9850005,pay,gc6g3st15,1610544600

上述订单数据中包含订单号、订单状态、支付回码、时间戳,可以发现,订单O9850003、O9850006的支付时间超过15分钟或者未支付。运行程序,验证猜想。

输出结果:

1
2
3
4
5
6
payed ok> OrderRes{orderId=O9850001, resultState='payed'}
payed timeout> OrderRes{orderId=O9850006, resultState='timeout:1610545500000'}
payed ok> OrderRes{orderId=O9850005, resultState='payed'}
payed ok> OrderRes{orderId=O9850004, resultState='payed'}
payed timeout> OrderRes{orderId=O9850003, resultState='timeout:1610544480000'}
payed ok> OrderRes{orderId=O9850002, resultState='payed'}

订单系统实时对账

上一个案例中已经有了订单支付状态事件数据,再结合到账事件数据,两者比对就可以进行对账操作。

只要两个数据的支付码能够匹配就是比对成功

1)准备数据payCallBackLog.txt

和上文案例中的orderLog.txt对比可以发现,payCallBackLog.txt最后两条数据没有匹配的,orderLog.txt最后一条没有匹配的。

1
2
3
4
5
6
3c586d2g8,yqb,1610543460
6c18cs3e8,wechat,1610543520
fc1gce2g8,yqb,1610543580
1c18us3e2,yqb,1610543590
2c53cw938,wechat,1610543760
gc6g3st15,wechat,1610543860

2)OrderPayCallBackEvent实体

1
2
3
4
5
6
7
8
9
10
11
12
public class OrderPayCallBackEvent {
private String transactId;
private String payChannel;
private Long timeStamp;

public OrderPayCallBackEvent(String transactId, String payChannel, Long timeStamp) {
this.transactId = transactId;
this.payChannel = payChannel;
this.timeStamp = timeStamp;
}
//省略getter/setter方法/toString方法
}

3)flink对账应用程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
/**
* 支付对账
*/
public class PayReconcile {

//定义侧输出流标签保存对账不匹配的数据
//订单支付事件
private final static OutputTag<OrderEvent> unmatchedOrderPay = new OutputTag<OrderEvent>("unmatched_order_pay") {
};

//到账事件
private final static OutputTag<OrderPayCallBackEvent> unmatchedOrderPayCallBack = new OutputTag<OrderPayCallBackEvent>("unmatched_order_pay_callback") {
};

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);

//读取数据源,转换为实体类
URL orderEventResource = PayReconcile.class.getResource("/orderLog.txt");
DataStreamSource<String> dataStreamSource1 = env.readTextFile(orderEventResource.getPath());
DataStream<OrderEvent> orderEventStream = dataStreamSource1.map(line -> {
String[] split = line.split(",");
return new OrderEvent(split[0], split[1], split[2], new Long(split[3]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderEvent>() {
@Override
public long extractAscendingTimestamp(OrderEvent orderEvent) {
return orderEvent.getTimeStamp() * 1000L;
}
}).filter(data -> !"".equals(data.getTransactId()));//过滤得到pay事件

URL orderPayResource = PayReconcile.class.getResource("/payCallBackLog.txt");
DataStreamSource<String> dataStreamSource2 = env.readTextFile(orderPayResource.getPath());
DataStream<OrderPayCallBackEvent> orderPayEventStream = dataStreamSource2.map(line -> {
String[] split = line.split(",");
return new OrderPayCallBackEvent(split[0], split[1], new Long(split[2]));
}).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<OrderPayCallBackEvent>() {
@Override
public long extractAscendingTimestamp(OrderPayCallBackEvent orderPayEvent) {
return orderPayEvent.getTimeStamp() * 1000L;
}
});

//将2条流进行合流操作,检查匹配项
SingleOutputStreamOperator<Tuple2<OrderEvent, OrderPayCallBackEvent>> resStream = orderEventStream.keyBy(OrderEvent::getTransactId)
.connect(orderPayEventStream.keyBy(OrderPayCallBackEvent::getTransactId))
.process(new PayMatchDetection());

resStream.print("matched_pays");
resStream.getSideOutput(unmatchedOrderPay).print("unmatched-orders-pay");
resStream.getSideOutput(unmatchedOrderPayCallBack).print("unmatched-order-pays-callback");

env.execute("pay reconcile job");
}

private static class PayMatchDetection extends CoProcessFunction<OrderEvent, OrderPayCallBackEvent, Tuple2<OrderEvent, OrderPayCallBackEvent>> {
//定义状态,保存当前已经到来的订单支付事件和支付到账事件
ValueState<OrderEvent> orderPayState;
ValueState<OrderPayCallBackEvent> orderPayCallBackState;

@Override
public void open(Configuration parameters) throws Exception {
orderPayState = getRuntimeContext().getState(new ValueStateDescriptor<OrderEvent>("order_pay", OrderEvent.class));
orderPayCallBackState = getRuntimeContext().getState(new ValueStateDescriptor<OrderPayCallBackEvent>("order_pay_callBack", OrderPayCallBackEvent.class));
}

@Override
public void processElement1(OrderEvent orderEvent, Context context, Collector<Tuple2<OrderEvent, OrderPayCallBackEvent>> collector) throws Exception {
//订单支付事件到来时判断有没有对应的支付回调事件
OrderPayCallBackEvent orderPayCallBackEvent = orderPayCallBackState.value();
if (orderPayCallBackEvent != null) {
//如果支付回调不为空,说明支付成功回调事件已经到达,输出匹配结果,清空状态
collector.collect(new Tuple2<>(orderEvent, orderPayCallBackEvent));
orderPayCallBackState.clear();
orderPayState.clear();
} else {
//如果支付回调事件没来,注册一个5秒钟的定时器
context.timerService().registerEventTimeTimer((orderEvent.getTimeStamp() + 5) * 1000L);
//更新订单支付事件状态
orderPayState.update(orderEvent);
}
}

@Override
public void processElement2(OrderPayCallBackEvent orderPayCallBackEvent, Context context, Collector<Tuple2<OrderEvent, OrderPayCallBackEvent>> collector) throws Exception {
//订单支付回调事件到来时判断有没有对应的订单支付事件
OrderEvent orderEvent = orderPayState.value();
if (orderEvent != null) {
//如果订单支付事件不为空,说明订单支付事件已经到达,输出匹配结果,清空状态
collector.collect(new Tuple2<>(orderEvent, orderPayCallBackEvent));
orderPayCallBackState.clear();
orderPayState.clear();
} else {
//如果订单支付事件没来,注册一个2秒钟的定时器
context.timerService().registerEventTimeTimer((orderPayCallBackEvent.getTimeStamp() + 2) * 1000L);
//更新订单支付事件状态
orderPayCallBackState.update(orderPayCallBackEvent);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<OrderEvent, OrderPayCallBackEvent>> out) throws Exception {
//触发定时器,为了省略删除定时器的操作,可以这样做:
//如果有一个事件不为空,则另一个对应的事件就还没到达,因为如果两个事件都到达会清空状态,导致两者同时为空
if (orderPayState.value() != null) {
ctx.output(unmatchedOrderPay, orderPayState.value());
}

if (orderPayCallBackState.value() != null) {
ctx.output(unmatchedOrderPayCallBack, orderPayCallBackState.value());
}

//清空状态
orderPayState.clear();
orderPayCallBackState.clear();
}
}
}

4)运行应用程序,输出对账成功的和对账失败的数据

1
2
3
unmatched-order-pays-callback> OrderPayCallBackEvent{transactId='2c53cw938', payChannel='wechat', timeStamp=1610543760}
unmatched-order-pays-callback> OrderPayCallBackEvent{transactId='gc6g3st15', payChannel='wechat', timeStamp=1610543860}
unmatched-orders-pay> OrderEvent{orderId=O9850005, eventType='pay', transactId='gc6g3st5', timeStamp=1610544600}

可见和之前的猜想一致。

面试题集锦

1.谈谈flink集群中都有哪些角色?它们都有什么作用?

Flink运行时架构由两种类型的进程组成:一个JobManager,以及多个TaskManager以及提交任务的Client。

JobManager进程由3个不同的组件构成:ResourceManager、Dispatcher、JobMaster。

JobManager在集群中充当管理者Master的角色,是整个集群的协调者,负责接收Flink Job,协调检查点,Failover故障恢复等。

TaskManager是具体执行计算的Worker,在其上执行Flink Job的一组Task,每个TaskManager负责管理其所在节点上的资源,比如内存、磁盘、网络,并将资源的状态向JobManager汇报。

Client是用来提交Flink应用程序的。当用户提交Flink应用程序时,会首先创建一个Client,该Client首先会对用户提交的Flink程序进行预处理,并提交到Flink集群中,Client从用户提交的Flink程序配置中获取JobManager的地址,并建立到JobManager的连接,将Flink Job提交给JobManager。

2.slot和parallelism的关系以及并行度是如何设置的?

一个算子(operator)的子任务(subtask)数,称之为该算子的并行度(parallelism)。

Slot指的是TaskManager能够提供的并发执行能力,是静态的并行能力的概念。而Parallelism则是TaskManager 实际会使用的并发能力,是动态的并行能力的概念。动态占用的资源总数必须小于静态能够提供的资源数。

parallelism有多种设置方法。

可以在算子层面设置,即代码中各个算子后面调用api直接设置。

可以在执行层面设置,即在Environment 中全局设置。

也可以在配置文件中设置,在flink-conf.yaml中配置。

还可以在命令行提交任务时 -p 指定。

3.聊聊Flink的反压机制

反压,即backpressure,也称作背压。通常在流计算处理链路中,如果数据输入的速度高于数据处理的速度,就会导致数据堆积。比如在Flink中,sink端处理数据的速度跟不上source端接收数据的速度,那么就会导致数据堆积,慢慢的,sink端的低效反过来给其上游链路造成了压力。如果说系统能感知到消息堆积,并调整消息发送(接收)的速度,使得数据的发送(接收)速度能和处理速度相协调,那么该系统就是具有背压感知机制的系统。Flink无疑就是具有背压应对机制的系统。

在Flink的官方博客中有这样的实验,将数据接收task的速度调整到其最大速度的60%,数据处理task以同样的速度处理。然后将数据处理task的速度降至其最高速度的30%,此时会产生背压问题,但是在Flink中,此时数据接收task的速度也自然降至其最高速度的30%。最后停止数据接收task的人为降速,数据接收task和数据消费task都达到了其最大速度。这就是flink中的反压处理机制,保证数据接收的速度不会快于数据消费的速度。Flink 使用了高效有界的分布式阻塞队列,类似Java中的通用阻塞队列(BlockingQueue)。下游消费者消费变慢,上游就会受到阻塞。Flink没有使用复杂的机制来处理反压问题, 而是利用自身作为纯数据流引擎的优势来优雅地处理反压问题。

4.端到端的exactly-once保证;如何保证消费kakfa数据不丢失

端到端的一致性指的是贯穿整个流处理应用的每一个组件都保证了其自身的一致性。

端到端的一致性要求数据源(source端)支持可重设数据的读取位置、从故障恢复时数据不会重复写入外设(sink端)。其次flink内部处理器使用checkpoint机制保证一致。

sink端一致性的具体实现有幂等写入和事务写入两种方式。事务性写入又有预写日志(WAL)和两阶段提交(2PC)两种方式。如果外部系统不支持事务,那么可以用预写日志的方式,把结果数据先当成状态保存,然后在收到 checkpoint 完成的通知时,一次性写入 sink 系统。

为了保证端到端的一致性,flink会维护消费kafka的偏移量,内部又有checkpoint机制,因此能够保证消费kafka数据不丢失。

5.说一下flink的状态机制、容错机制

官网对Flink的介绍是:一个用于无界和有界数据流上的有状态计算的分布式处理框架。因此Flink必然有一套状态管理机制来保证其状态的一致性。

在Flink中,状态与特定算子相关联。常用的就是Managed State下的Keyed State。Flink利用checkpoint机制对各个任务的状态进行快照保存,在故障恢复时能够保证状态一致性。Flink通过状态后端来管理状态以及对checkpoint存储,状态后端通常支持内存、分布式文件系统、嵌入式数据库rocksDB。

Flink的容错机制和状态管理是息息相关的。容错本质上就是对状态的容错,就是保证状态不丢失,后续可以重新读取状态恢复现场。

6.谈一下flink的watermark机制

watermark本质上是为了处理乱序或者延迟数据而引入整体的时间延迟机制。主要用来衡量EventTime的进展,能够在一定程度上容忍迟到的数据,能够将迟到的数据纳入正常的处理中。

7.讲讲flink中的几种时间语义

在flink中,有3中不同的时间语义

  • event time:事件创建的时间。比如用户点击按钮触发时。
  • ingestion time:数据进入flink的时间。即经过一系列的网络传输,进入flink source的时间。
  • processing time:flink执行operator操作时本地系统时间。从source进来到分配到TaskManager中的slot处理也是耗时的,比如数据重分区,因此存在理论上的时间差。即理论上processing time晚于ingestion time。

8.谈谈CEP的应用

CEP是复杂事件处理(Complex Event Processing)的简称。CEP 可以在无界事件流中检测匹配某种模式的事件,输出用户想得到的数据。CEP中的个体模式、组合模式、模式组能够让我们在处理复杂的业务逻辑时游刃有余。具体应用参见本文的订单系统实时对账。全文连载于CSDN博客。请访问个人主页:https://blog.csdn.net/hellozpc

9.讲讲你们公司的flink架构以及你们是如何提交实时任务的

我主要用公司的flink集群做司机车辆位置上报数据的处理,我们用的是腾讯云4核的4G的机器。共10台机器,随业务量的增加动态扩展。每台机器的slot数和cpu核数保持一致。我们使用的是standalone模式,并且配置了HA 高可用。我们通常有两种提交作业的方式。

1)直接将flink应用打成jar包,通过命令行或者页面提交任务

2)在代码中使用createRemoteEnvironment远程提交任务,可以集成到SpringBoot应用中

10.你们在使用flink时有没有使用到关系数据库存储数据,如何做的

我们一般不会直接将flink的数据写入到RDBMS中。一般先将处理结果写入kafka,rabbitmq中,再写入mysql。用消息队列来为关系型数据库降低压力。