大数据计算框架 Flink 实战教程共分为3篇。涵盖 Flink 安装配置、基本原理、核心概念、 流处理 API 和批处理 API、存储及状态一致性、容错机制、实战案例以及面试题讲解等。
本篇为第2篇,主要讲解 Flink 的 API、Window 和 Watermark 机制、状态管理和存储、容错机制、状态一致性等等。第1篇看这里 。第3篇看这里 。
文章目录
大数据计算框架 Flink 实战教程共分为3篇。涵盖 Flink 安装配置、基本原理、核心概念、 流处理 API 和批处理 API、存储及状态一致性、容错机制、实战案例以及面试题讲解等。
本篇为第2篇,主要讲解 Flink 的 API、Window 和 Watermark 机制、状态管理和存储、容错机制、状态一致性等等。第1篇看这里 。
FlinkAPI Environment 执行Flink 程序首先要判断flink环境。Flink 中有3种获取执行环境的方式。
1)getExecutionEnvironment
获取当前执行程序的上下文。如果是直接在IDEA中运行的JAVA代码,则此方法返回本地执行环境。如果是从命令行或web页面提交flink任务到集群中,则此方法返回的是集群执行环境。这种方式是最常用的,Flink底层帮我们判断具体调用本地还是远程环境。
1 2 ExecutionEnvironment.getExecutionEnvironment();//获取批处理执行环境 StreamExecutionEnvironment.getExecutionEnvironment(); //获取流处理执行环境
2)createLocalEnvironment
直接返回本地执行环境,这种方式可以指定并行度。如不指定则使用当前机器可用cpu核数作为并行度。其实第1种方式判断当前环境是本地环境的话,底层也会调此方法。
1 ExecutionEnvironment.createLocalEnvironment();
3)createRemoteEnvironment
获取远程集群执行环境。如果将Jar包提交到远程Flink集群执行则需指定JobManager的IP和port,并指定jar包路径
1 ExecutionEnvironment.createRemoteEnvironment(hostname,port,"hdfs://wordCount.jar");
Source source是Flink 应用程序的数据来源。作为一款通用的数据处理框架,flink既可以处理静态的历史数据集,也可以处理实时的流式数据。流式计算场景下只要数据源源不断传入,flink就能一直处理。下面讲解Flink中的几种数据输入方式。
1)从本地集合中读取
1 2 3 executionEnvironment.fromCollection(Arrays.asList("a", "b", "c","d"));//从JAVA Collection中读取数据 executionEnvironment.fromElements(1, 2, 3, 4);//从给定的对象序列中读取数据
2)从文件中读取
1 2 String inputPath = "F:\\data\\file"; executionEnvironment.readTextFile(inputPath);//使用默认的文件格式
3)从socket中读取
1 2 3 env.socketTextStream("localhost", 9999);//从指定的IP地址和端口处读取数据,使用默认行分隔符 env.socketTextStream(hostname, port, delimiter);//指定行分隔符
4)从Kafka中读取
实际开发中,Kafka作为Flink数据源非常常见,可以说Kafka和Flink在流式数据处理领域是天生的一对。
引入Kafka 连接器pom依赖,连接器的版本和Flink 版本保持一致
1 2 3 4 5 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.9.2</version> </dependency>
Flink中添加kafka数据源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置参数 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.174.129:9092"); props.put("zookeeper.connect", "192.168.174.129:2181"); //props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); //FlinkKafkaConsumer011 表示对应的kafka版本是0.11.x DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "test01", //kafka topic new SimpleStringSchema(), // String 序列化 props)).setParallelism(1); dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 env.execute("Flink add kafka data source"); }
addSource是一般化的添加数据源的算子,前面几种source都是Flink根据特定应用场景封装好的算子,底层还是调用了addSource。
可以测试一下上述程序。在linux服务器上启动kafka集群,并通过命令行运行一个Producer发送消息。查看Flink是否消费到数据。
Kafka相关教程可以参考这篇文章:《Kafka 实战教程》
5)自定义source
有时为了方便测试Flink应用程序,我们需要手动造数据,这就要用到自定义数据源。
自定义的DataSource只要实现org.apache.flink.streaming.api.functions.source.SourceFunction接口即可被作为数据源添加。
下面的例子展示了如何自定义数据源。需求是实现一个实时数字生成器,1秒钟产生1个自增数字发送到Flink。Flink收到数据后放大两倍输出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 /** * 自定义Flink数据源,重写SourceFunction的run和cancel方法 */ import org.apache.flink.streaming.api.functions.source.SourceFunction; public class MyDataSource implements SourceFunction<Integer> { private boolean isRunning = true; /** * run方法里编写数据产生逻辑 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Integer> ctx) throws Exception { int i = 1; while (isRunning) { ctx.collect(i); i++; Thread.sleep(1000); } } @Override public void cancel() { isRunning = false; } } public class MyDataSourceTest { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e); res.print(); env.execute("Flink add dataSource"); } }
掌握了自定义数据源的使用有助于实际开发,比如可以写个实时读取MySql数据源的工具!
Transform也可称为Operator,翻译为中文为“算子”,其实就是数据转换操作。下面讲解Flink中的几种数据转换操作,先从流式处理,即DataStream 操作讲起。批处理与之类似。
1)Map
Map就是映射,顾名思义,就是将输入数据进行转换操作。
map算子的输入参数是一个MapFunction,我们只要实现它重写其中的map函数即可
1 2 3 public interface MapFunction<T, O> extends Function, Serializable { O map(T value) throws Exception; }
比如将商品数据流中的每个商品价格翻倍:
1 2 3 4 5 6 7 8 SingleOutputStreamOperator<Product> map = dataStreamSource.map(new MapFunction<Product, Product>() { @Override public Product map(Product product) throws Exception { product.price = product.price * 2; return product; } }); map.print();
对于简单的转换操作我们也可以直接使用lambda 表达式,比如.map(e -> 2 * e);
1 2 3 4 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> mySource = env.addSource(new MyDataSource()); SingleOutputStreamOperator<Integer> res = mySource.map(e -> 2 * e);//把数据源中的每个元素放大2倍 res.print();
2)FlatMap
FlatMap意指扁平化的map,即将每个元素map后的数据打散,重新组成一个“宽”的集合。和JDK8中的flatMap本质一样。
FlatMap的输入参数是一个FlatMapFunction,只要重写其flatMap方法即可,value是输入数据,out是输出数据收集器
1 2 3 4 5 6 7 8 9 10 11 12 13 public interface FlatMapFunction<T, O> extends Function, Serializable { void flatMap(T value, Collector<O> out) throws Exception; } dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { //接收一个字符串(Wordcount中表示一行数据),输出一个2元组 @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new Tuple2<String, Integer>(word, 1)); } } })
FlatMap和Map的区别在第一篇 快速入门案例中已讲解,此处不再赘述。
3)Filter
对元素进行过滤,重写FilterFunction的filter实现过滤逻辑,简单的过滤逻辑可以直接使用lambda表达式
1 2 3 public interface FilterFunction<T> extends Function, Serializable { boolean filter(T value) throws Exception; }
例如过滤价格超过100的商品
1 2 3 4 5 6 7 8 9 10 SingleOutputStreamOperator<Product> res = mySource.filter(new FilterFunction<Product>() { @Override public boolean filter(Product product) throws Exception { if (product.price >= 100) { return true; } return false; } }) res.print();
4)KeyBy
根据指定的key对流数据元素进行分区,底层基于hash算法,hashCode相同的key被分到同一个分区,即分到下游算子并行节点中的一个。比如快速入门案例中,flatMap之后的数据按照单词分组,即按照二元组数据的第一个字段:Tuple2.f0 分组
1 2 3 DataStream<Tuple2<String, Integer>> dataStream = dataStreamSource .flatMap(new Splitter()) .keyBy(value -> value.f0)
keyBy的参数是KeySelector<IN, KEY>,前一个泛型表示来源数据类型,后一个泛型表示从原数据中提取出来的key的类型
再比如根据商品的品牌来分组
1 2 3 4 5 6 7 KeyedStream<Product, String> keyByedProd = productStream.keyBy(new KeySelector<Product, String>() { @Override public String getKey(Product product) throws Exception { return product.brand; } }); keyByedProd.print();
简写:.keyBy(product-> product.brand)
5)Reduce
reduce俗称“约减”,就是将元素进行聚合处理。常见的sum、min、max、count、average等聚合操作都可以使用原生的reduce实现。reduce算子的入参是ReduceFunction,value1表示前一个元素,value2表示后一个元素,reduce方法是具体的数据处理逻辑。reduce操作实质上就是不断地将数据源中两个值合并为同一类型的一个值,reduce函数连续应用于输入数据流中的所有值,直到只剩下一个值(聚合之后的结果)。
1 2 3 public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; }
比如统计各个品牌商品的总价
1 2 3 4 5 6 7 8 9 10 11 12 13 SingleOutputStreamOperator<Product> reduceRes = productStream.keyBy(new KeySelector<Product, String>() { @Override public String getKey(Product product) throws Exception { return product.brand; } }).reduce(new ReduceFunction<Product>() { @Override public Product reduce(Product product1, Product product2) throws Exception { product2.price = (product1.price + product2.price); return product2; } }); reduceRes.print();
6)Aggregation
Flink中支持对数据流的各种聚合操作,并封装了很多聚集函数。像min、max、sum等聚集函数都可以应用于 KeyedStream获得聚合结果。聚合算子参数如果是int类型,则表示聚合字段的下标(从0开始)。比如快速入门案例中对二元组数据求和:.sum(1)表示求二元组中第二个字段(单词计数)的和。如果是string类型,则表示聚合字段名,通常是一个pojo对象的public属性。
1 2 3 4 5 6 7 8 9 10 KeyedStream.sum(0) KeyedStream.sum("field0") KeyedStream.min(1) KeyedStream.min("field1") KeyedStream.max(2) KeyedStream.max("field2") KeyedStream.minBy(3) KeyedStream.minBy("field3") KeyedStream.maxBy(4) KeyedStream.maxBy("field4")
7)Split和Select
Split是根据指定条件将数据流拆分为两个或多个流,可以单独处理每个数据流。Select是从拆分的流中选择特定的流。select和split一般结合使用,正如keyBy和聚集函数一起使用一样。实现这样的需求:按照商品价格比如100元为界将商品分为优品(>100)和良品。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 990)); products.add(new Product("B", "安踏", 90)); products.add(new Product("C", "耐克", 880)); products.add(new Product("D", "特步", 80)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() { @Override public Iterable<String> select(Product product) { List<String> list = new ArrayList<>();//使用list作为临时数据结构存储标签 if (product.getPrice() > 100) { list.add("优品"); } else { list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); DataStream<Product> allProducts = splitStream.select("良品","优品"); superiorProducts.print("优品"); //启动计算任务 executionEnvironment.execute("Stream operator"); }
控制台输出“优品”的数据:
1 2 优品:1> Product{name='C', brand='耐克', price=880.0} 优品:4> Product{name='A', brand='阿迪', price=990.0}
有分流操作,那么与之对应的必然有合流操作。Flink中合流操作有2种:Union和Connect。
8)Union
Union函数表示将两个或多个数据类型相同的流组合在一起,即求并集。
1 2 3 4 5 StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStream1 = executionEnvironment.fromCollection(Arrays.asList("a", "b", "c", "d")); DataStreamSource<String> dataStream2 = executionEnvironment.fromCollection(Arrays.asList("1", "2", "3", "4")); DataStream<String> union = dataStream2.union(dataStream1); union.print();
控制台输出合并之后的数据:
1 2 3 4 5 6 7 8 3> b 4> c 1> d 1> 3 4> 2 2> a 3> 1 2> 4
9)connect和CoMap
两个datastream连接后转变成connectedstreams,即:datastream,datastream->connectedstreams。与union不同的是connect不要求被连接的两个流数据类型相同。两个流虽然被connect到了同一个流中,但是合并之后的流内部依然保持各自的数据格式不变,相互独立。connect通常和coMap一起使用,coMap对connect之后的流做数据处理。
实际应用中一个数据流过来可能先根据元素的某种特征分开处理,到了一定阶段又需要合并处理,此时就需要用到分流和合流操作。
实现这样的需求:连接一个二元组类型的数据流和一个Product的数据流。并分别对连接后的数据流做map操作。
还是沿用之前的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public static void main(String[] args) throws Exception { StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); List<Product> products = new ArrayList<>(); products.add(new Product("A", "阿迪", 999)); products.add(new Product("B", "安踏", 99)); products.add(new Product("C", "耐克", 888)); products.add(new Product("D", "特步", 89)); DataStreamSource<Product> streamSource = executionEnvironment.fromCollection(products); SplitStream<Product> splitStream = streamSource.split(new OutputSelector<Product>() { @Override public Iterable<String> select(Product product) { List<String> list = new ArrayList<>(); if (product.getPrice() > 100) { list.add("优品"); } else { list.add("良品"); } return list; } }); DataStream<Product> superiorProducts = splitStream.select("优品"); DataStream<Product> acceptedProducts = splitStream.select("良品"); //先将优品数据转换为2元组类型 DataStream<Tuple2<String, Double>> superiorProductsStream = superiorProducts.map(new MapFunction<Product, Tuple2<String, Double>>() { @Override public Tuple2<String, Double> map(Product product) throws Exception { return new Tuple2<>(product.getName(), product.getPrice()); } }); //连接二元组数据流和Product数据类型,并做算子操作,都转换为3元组 SingleOutputStreamOperator<Object> operator = superiorProductsStream.connect(acceptedProducts).map(new CoMapFunction<Tuple2<String, Double>, Product, Object>() { @Override public Object map1(Tuple2 value) throws Exception { return new Tuple3<>(value.f0, value.f1, "优品"); } @Override public Object map2(Product value) throws Exception { return new Tuple3<>(value.getName(), value.getPrice(), "良品"); } }); operator.print("coMap"); //启动计算任务 executionEnvironment.execute("Stream operator"); }
控制台输出如下内容,说明案例中不同数据类型的流连接(connect)、处理(coMap)成功。
1 2 3 4 coMap:1> (A,999.0,优品) coMap:2> (C,888.0,优品) coMap:4> (D,89.0,良品) coMap:3> (B,99.0,良品)
观察上面的介绍的几种操作可以总结一些规律。
比如keyBy操作总是和聚集函数一起使用、split通常和select一起使用、connect和coMap一起使用。datastream split后得到splitstream,再select之后又转换为datastream ;同样的,datastream connect之后得到connectedstreams,再经coMap操作后又转换为datastream 。union合并的两个流数据类型必须相同,合并过程不涉及流类型的转换。而connect不要求数据流的元素类型相同。union操作可以操作多个流,connect操作只能操作两个流。
上述介绍的 DataStream(流处理) 数据转换操作中,有些也适合DataSet(批处理)。比如 Map、FlatMap、Reduce、Filter 等。
当然DataSet也有一些特有算子。
比如在DataStream中分区是 KeyBy,而DataSet中是GroupBy。这在快速入门案例中已经演示,不再赘述。
DataSet有个first(n)方法可以返回DataSet中前 n个元素,比如:env.readTextFile(inputPath).first(2);返回数据集中前2个元素。
Flink数据类型 前文中介绍Flink中算子的使用时提到了数据类型,下面简单介绍一下Flink中所支持的数据类型。
Flink应用程序处理的是由数据对象组成的连续不断的数据流。这些数据对象需要被序列化和反序列化,以便能够通过网络传输以及从检查点、保存点、状态后端存储读取。为了明确应用程序所处理的数据类型,Flink底层提供了一套完备的数据类型信息,并且为每一种类型提供了序列化器、反序列化器以及比较器。
此外,Flink还提供了类型提取系统,自动分析函数的输入类型和输出类型,以获得对应的序列化器和反序列化器。在使用lambda函数或者泛型类型时,需显式指定类型信息。
Flink DataStream里的元素类型支持JAVA和Scala中的所有基本类型,像Int、Long、Double、String等。此外还支持Tuple元组类型、Java简单对象(pojo)、scala样例类以及一些集合类型,比如Java的ArrayList、HashMap、Enum等。
Flink的每个函数都提供了对应的Rich版本。富函数相比普通的函数可以获取flink运行时上下文、生命周期方法。生命周期方法中通常可以做一些初始化及收尾操作,比如连接数据库、关闭数据库连接。
Sink sink,顾名思义,下沉,在Flink中意指数据输出、数据落地的意思。最简单的数据输出方式就是打印到控制台,调用datastream的print()方法即可,print就是一种sink操作。对于不同的sink方式,Flink提供了各种内置的输出格式。
除了基本的输入输出数据源外,flink目前还支持下列第三方组件作为数据源。
本节介绍几种常用的数据输出方式。
1)普通文件、socket
writeAsText()/TextOutputFormat:将元素按行写入字符串。字符串通过调用每个元素的toString()方法获得。
writeAsCsv(…)/CsvOutputFormat:将数据以逗号分隔的形式写入文件。换行符和字段分隔符可配置。每个字段的值来自对象的toString()方法。
writeUsingOutputFormat() / FileOutputFormat:自定义文件输出格式,支持自定义对象到字节的转换。
writeToSocket:根据指定格式(Serialization Schema)将元素写入网络套接字。
举例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class SinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度为1,输出结果全部写出到一个文件,否则开发环境会使用默认并行度分区,分区数为当前机器逻辑cpu核数 env.setParallelism(1); //准备数据源 List<Tuple2<String, Integer>> list = new ArrayList<>(); list.add(new Tuple2<>("A", 100)); list.add(new Tuple2<>("B", 200)); list.add(new Tuple2<>("C", 300)); list.add(new Tuple2<>("D", 400)); DataStreamSource<Tuple2<String, Integer>> dataStreamSource = env.fromCollection(list); dataStreamSource.print(); //除了路径参数是必填外,还可以通过指定第二个参数来定义输出模式 dataStreamSource.writeAsText("d://sink-text.txt", FileSystem.WriteMode.OVERWRITE); //如果想要将输出结果全部写出到一个文件,可以单独设置算子的并行度为 1 dataStreamSource.writeAsCsv("d://sink-csv.txt", FileSystem.WriteMode.OVERWRITE, "\n", ",").setParallelism(1); //自定义的输出格式,writeAsText/writeAsCsv底层调用的都是该方法 dataStreamSource.writeUsingOutputFormat(new TextOutputFormat(new Path("d://sink-file.txt"), "UTF-8")); //以字符串的形式输出到socket服务器 dataStreamSource.map(t -> t.f0 + ":" + t.f1 + "\r\n").writeToSocket("192.168.244.131", 9999, new SimpleStringSchema()); env.execute("sink demo"); } }
测试socket输出时,先在linux服务器上使用nc -lk 9999 模拟socket服务器开启监听。
2)kafka kafka和flink天生对流式数据友好,因此实际生产中经常搭配使用。比如flink从数据源接收到数据处理完成后再发送一个消息到kafka中,任其消费。也有从kafka进、kafka出的使用场景,即输入、输出源都是kafka。比如对原始输出数据进行分流处理,并且处理完成后发送到不同的消费者topic中去。下面介绍如何在flink中集成kafka。
引入依赖
1 2 3 4 5 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.11.0</version> </dependency>
需求:实现Flink消费kafka消息队列的消息,转换处理后再次输出到kafka中。
实体类Product
1 2 3 4 5 6 public class Product { private String name; private String brand; private double price; //省略get/set }
通过linux命令行创建2个topic,一个由flink消费,另一个由flink写入。
1 2 3 4 5 6 7 8 9 10 bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-in-topic bin/kafka-topics.sh --create \ --bootstrap-server 192.168.244.131:9092 \ --replication-factor 1 \ --partitions 1 \ --topic flink-stream-out-topic
查看topic: bin/kafka-topics.sh --list --bootstrap-server 192.168.244.131:9092
启动一个消费者,接收flink的输出
1 bin/kafka-console-consumer.sh --bootstrap-server 192.168.244.131:9092 --topic flink-stream-out-topic
启动一个生产者,向flink应用程序监听的topic发送消息
1 bin/kafka-console-producer.sh --topic flink-stream-in-topic --bootstrap-server 192.168.244.131:9092
在生产者端输入json串:{"name":"跑鞋","brand":"Nike","price":1000}
Flink应用程序集成Kafka:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class KafkaSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 Properties props = new Properties(); props.put("bootstrap.servers", "192.168.244.131:9092"); props.put("zookeeper.connect", "192.168.244.131:2181"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//value 反序列化 props.put("auto.offset.reset", "latest"); //从kafka中消费数据 DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>( "flink-stream-in-topic", //kafka topic new SimpleStringSchema(), // String序列化 props)).setParallelism(1);//并行度一般不超过kafka topic分区数 dataStreamSource.print(); //把从 kafka 读取到的数据打印在控制台 //对数据进行业务处理 SingleOutputStreamOperator<String> streamOperated = dataStreamSource.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { Product product = JSON.parseObject(value, Product.class); //将商品价格翻倍 product.setPrice(product.getPrice()*2); return JSON.toJSONString(product); } }); //将处理完的数据再次发送到kafka中 streamOperated.addSink(new FlinkKafkaProducer011<>( "flink-stream-out-topic", new SimpleStringSchema(), props)).setParallelism(1); env.execute("kafka data source"); } }
运行flink应用后,kafka消费者端将收到处理后的数据:
1 {"brand":"Nike","name":"跑鞋","price":2000.0}
3)redis
Redis Connector 用于向 Redis 发送数据。可以使用三种不同的方法与不同类型的 Redis 环境进行通信
单 Redis 服务器
Redis 集群
Redis Sentinel(哨兵)
不同模式主要是Config类的不同,本例展示了单机模式下Flink写入redis
引入redis连接器依赖
1 2 3 4 5 <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
编写flink应用代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class RedisSinkDemo { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //准备数据源,简单起见,这里使用本地集合数据 List<Tuple2<String, String>> list = new ArrayList<>(); list.add(new Tuple2<>("A", "apple")); list.add(new Tuple2<>("B", "bird")); list.add(new Tuple2<>("C", "cat")); list.add(new Tuple2<>("D", "dog")); DataStreamSource<Tuple2<String, String>> dataStreamSource = env.fromCollection(list); //单机Redis配置,这里只简单配置ip/端口,还支持其它配置比如maxTotal、maxIdle、timeout FlinkJedisPoolConfig redisConf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build(); //数据写入redis dataStreamSource.addSink(new RedisSink<>(redisConf, new RedisMapper<Tuple2<String, String>>() { @Override public RedisCommandDescription getCommandDescription() { //指定redis命令,这里只演示最简单的设置字符串key return new RedisCommandDescription(RedisCommand.SET); } @Override public String getKeyFromData(Tuple2<String, String> data) { //提取要存到redis的key return data.f0; } @Override public String getValueFromData(Tuple2<String, String> data) { //提取要存到redis的value return data.f1; } })); env.execute("redis data sink"); } }