Flink流式计算与批处理 —— 词频统计
也是因为最近处理业务上的问题,如果我希望看到某些实时数据的变化那么必然会涉及到流式计算,大数据处理框架从原始的 MapReduce、Hive 到 DAG的Tez,再到Spark的近实时处理是一个不断进化的过程,对于要求更高的场景还是Apache Flink比较在行,Apache Flink是一个带状态的流式计算框架,Flink不但能进行流式计算,也能进行批处理,也就是离线计算。下面是最近两天学习Flink的一些笔记,后面会逐步完善和校正。
大数据处理计算模式概览
流计算与批计算对比:
1、数据时效性不同:流式计算实时、低延迟,批量计算非实时、高延迟。
2、数据特征不同:流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据。
3、应用场景不同:流式计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控等批量计算一般 批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。
4、运行方式不同:流式计算的任务持续进行的,批量计算的任务则一次性完成。
Apache Flink 概览
上图来自 Apache Flink 官网: https://flink.apache.org/ , 其实最好的学习途径就是先看官方文档,没有人比官方文档更加清晰易懂,看完了之后简单总结一下:
从上图得知:
1、数据来源与计算形式:Flink的数据来源可以是业务数据、日志、IOT采集数据、点击数据(可以理解为行为交互数据),实时事件数据支持流式计算,同时也支持从DB、KV、File中读取批量数据进行批处理。
2、事件驱动、支持数据管道 & ETL
3、支持 Hadoop Yarn、Apache Mesos、K8S部署,水平扩展性强,支持的的数据存储形式HDFS、NFS
4、处理完后的数据同样可以输出到应用、Log、DB或者KV
5、Flink 具备低延迟、高吞吐、基于内存计算等特点
Flink核心特性
1、统一数据处理组件栈,处理不同类型的数据需求,包括流计算、批处理、机器学习相关、图计算
2、时间模式:支持事件时间(Event Time),接入时间(Ingestion Time),处理时间(ProcessingTime)等时间概念
3、基于轻量级分布式快照的实现的容错机制
4、支持有状态计算:状态是 Flink 中的一等公民,Flink提供了多种状态类型:list、map、value
5、支持高度灵活的窗口操作
6、带反压的连续流支持,下游算子处理速度跟不上的时候通过Flow Control将信号传递给上游算子
7、基于JVM实现了独立的内存管理,只存储实际数据的二进制内容,节省空间,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好 ,提升性能的同时, 减少了Full GC和OOM的发生
8、分层次API:Table API / SQL 、 DataStream API ( 流操作的原语支持,Java 和 Scala )、 ProcessFunction
有界流 & 无界流概念
有界流 & 无界流在Flink中属于特别重要的概念,对于离线计算来讲,就是处理的有界流,也叫做批处理。流式计算通常来讲就是处理的无界流,即只有开始,没有结束,从运行的那一刻起就需要不断处理来自数据源的新数据。
构建Flink工程
Maven Archetype 构建
首先通过Maven来建立一个简易Flink工程,通过 Maven Archetype 的构建方式:
1mvn archetype:generate \
2 -DarchetypeGroupId=org.apache.flink \
3 -DarchetypeArtifactId=flink-walkthrough-datastream-java \
4 -DarchetypeVersion=1.14.4 \
5 -DgroupId=frauddetection \
6 -DartifactId=frauddetection \
7 -Dversion=0.1 \
8 -Dpackage=spendreport \
9 -DinteractiveMode=false
这样即可得到了一个示例工程,里面的依赖和示例代码可以参考,通过也有log4j的配置文件。
1public class FraudDetectionJob {
2 public static void main(String[] args) throws Exception {
3 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
4
5 // 拿数据
6 DataStream<Transaction> transactions = env
7 .addSource(new TransactionSource())
8 .name("transactions");
9 // 处理数据
10 DataStream<Alert> alerts = transactions
11 .keyBy(Transaction::getAccountId)
12 .process(new FraudDetector())
13 .name("fraud-detector");
14 // 结果写回
15 alerts
16 .addSink(new AlertSink())
17 .name("send-alerts");
18
19 env.execute("Fraud Detection");
20 }
21}
从上面的示例代码中可以的得到一个Flink编程范式,即拿数据、处理数据、再到结果输出的过程:
1、对接数据源 2、使用引擎进行逻辑处理 3、结果写回某处/输出
MR:InputFormat => Mapper => Reducer => ……
Hive:Table(s) => SQL =>insert…
Spark:RDD/DF/DS => Transaction> Action/Output
Flink:Source => Transaction => Sink / CH
这一点对于大部分大数据处理框架来说都是大同小异的!
Maven 模块化构建
第二个构建工程的方式就是 Maven modules 多模块方式构建,IDEA 直接新建 module即可,Maven子模块想要用父工程的依赖管理,首先在父工程的dependencyManagement 中定义好依赖(注意需要去掉依赖的作用域 scope),子工程再逐个引入(去除version、scope等信息),子工程不用再关心版本问题。
子工程 pom.xml
1<dependencies>
2 <dependency>
3 <groupId>org.apache.flink</groupId>
4 <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
5 </dependency>
6
7 <dependency>
8 <groupId>org.apache.flink</groupId>
9 <artifactId>flink-clients_${scala.binary.version}</artifactId>
10 </dependency>
11</dependencies>
父工程 pom.xml
1<dependencyManagement>
2 <dependencies>
3 <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
4 <dependency>
5 <groupId>org.apache.flink</groupId>
6 <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
7 <version>${flink.version}</version>
8 </dependency>
9 <dependency>
10 <groupId>org.apache.flink</groupId>
11 <artifactId>flink-clients_${scala.binary.version}</artifactId>
12 <version>${flink.version}</version>
13 </dependency>
14 </dependencies>
15</dependencyManagement>
Flink统计词频Demo
流式计算统计词频
对于流式计算得先找到一个数据流,这里简单用 nc
命令代替,nc
式类Unix下的一款非常强大的网络工具库,通过命令行输入如下命令就可以建立Socket服务并且通过终端把字符串发送至客户端:
1root@ubuntu:~# nc -lk 9527
2hello,hello,world,flink,flink,hello
3hello,hello,world,flink,flink,hello
这里9527是服务监听的端口号。
SteamingWCApp.java
1package cn.tim.flink;
2
3import org.apache.commons.lang3.StringUtils;
4import org.apache.flink.api.common.functions.FilterFunction;
5import org.apache.flink.api.common.functions.FlatMapFunction;
6import org.apache.flink.api.common.functions.MapFunction;
7import org.apache.flink.api.java.functions.KeySelector;
8import org.apache.flink.api.java.tuple.Tuple2;
9import org.apache.flink.streaming.api.datastream.DataStreamSource;
10import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11import org.apache.flink.util.Collector;
12
13/**
14 * 基于 Flink 流式处理 —— 词频统计
15 * 结合 Linux nc命令(功能强大的网络工具)作为 socket 数据流来源
16 * nc -lk 9527
17 * hello,hello,world,flink,flink,hello
18 */
19public class SteamingWCApp {
20 public static void main(String[] args) throws Exception {
21 // 上下文 -> 流处理的上下文
22 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
23
24 // 对接数据源,对接 Socket数据
25 DataStreamSource<String> source = env.socketTextStream("192.168.31.32", 9527);
26
27 // 业务逻辑处理
28 source.flatMap(new FlatMapFunction<String, String>() {
29 @Override
30 public void flatMap(String s, Collector<String> out) throws Exception {
31 String[] words = s.split(",");
32 for(String word: words) {
33 out.collect(word.toLowerCase().trim());
34 }
35 }
36 }).filter(new FilterFunction<String>() { // 空字符串过滤
37 @Override
38 public boolean filter(String s) throws Exception {
39 return StringUtils.isNotEmpty(s);
40 }
41 }).map(new MapFunction<String, Tuple2<String, Integer>>() {
42 @Override
43 public Tuple2<String, Integer> map(String s) throws Exception {
44 return new Tuple2<>(s, 1);
45 }
46 })
47 // .keyBy(0).sum(1) // 已过时:第一个位置做keyBy -> 0, 对第二个位置做求和操作 -> 1
48 .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
49 @Override
50 public String getKey(Tuple2<String, Integer> value) throws Exception {
51 return value.f0;
52 }
53 })
54 .sum(1)
55 .print();
56
57 env.execute("SteamingWCApp");
58 /*
59 * 7> (world,1)
60 * 4> (hello,1)
61 * 10> (flink,1)
62 * 10> (flink,2)
63 * 4> (hello,2)
64 * 4> (hello,3)
65 */
66 }
67}
批处理统计词频
Flink同样支持批处理,在统计词频的这个问题中,示例数据写在了 data/word.data这个文本文件里,内容就是一些单词,data/word.data 如下:
BatchWCApp.java
1package cn.tim.flink;
2
3import org.apache.commons.lang3.StringUtils;
4import org.apache.flink.api.common.functions.FilterFunction;
5import org.apache.flink.api.common.functions.FlatMapFunction;
6import org.apache.flink.api.common.functions.MapFunction;
7import org.apache.flink.api.java.ExecutionEnvironment;
8import org.apache.flink.api.java.operators.DataSource;
9import org.apache.flink.api.java.tuple.Tuple2;
10import org.apache.flink.util.Collector;
11
12/**
13 * 离线计算 —— 词频统计
14 */
15public class BatchWCApp {
16 public static void main(String[] args) throws Exception {
17 // 离线计算环境 -> 批处理的上下文
18 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
19
20 // 从文集读文本数据
21 DataSource<String> source = env.readTextFile("data/word.data");
22
23 // 设置过滤器和算子
24 source.flatMap(new FlatMapFunction<String, String>() {
25 @Override
26 public void flatMap(String s, Collector<String> collector) throws Exception {
27 String[] words = s.split(",");
28 for(String word: words) {
29 collector.collect(word.toLowerCase().trim());
30 }
31 }
32 }).filter(new FilterFunction<String>() {
33 @Override
34 public boolean filter(String s) throws Exception {
35 return StringUtils.isNotEmpty(s);
36 }
37 }).map(new MapFunction<String, Tuple2<String, Integer>>() {
38 @Override
39 public Tuple2<String, Integer> map(String s) throws Exception {
40 return new Tuple2<>(s, 1);
41 }
42 }).groupBy(0)
43 .sum(1)
44 .print();
45
46 /*
47 * (flink,9)
48 * (world,3)
49 * (hello,3)
50 */
51 // 批处理无需exec
52 }
53}
Github 对应代码仓库: https://github.com/zouchanglin/apache-flink/tree/main/modules-flink/flink-basic