Flink流式计算与批处理 —— 词频统计

也是因为最近处理业务上的问题,如果我希望看到某些实时数据的变化那么必然会涉及到流式计算,大数据处理框架从原始的 MapReduce、Hive 到 DAG的Tez,再到Spark的近实时处理是一个不断进化的过程,对于要求更高的场景还是Apache Flink比较在行,Apache Flink是一个带状态的流式计算框架,Flink不但能进行流式计算,也能进行批处理,也就是离线计算。下面是最近两天学习Flink的一些笔记,后面会逐步完善和校正。

大数据处理计算模式概览

流计算与批计算对比:

1、数据时效性不同:流式计算实时、低延迟,批量计算非实时、高延迟。

2、数据特征不同:流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据。

3、应用场景不同:流式计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控等批量计算一般 批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。

4、运行方式不同:流式计算的任务持续进行的,批量计算的任务则一次性完成。

上图来自 Apache Flink 官网: https://flink.apache.org/ , 其实最好的学习途径就是先看官方文档,没有人比官方文档更加清晰易懂,看完了之后简单总结一下:

从上图得知:

1、数据来源与计算形式:Flink的数据来源可以是业务数据、日志、IOT采集数据、点击数据(可以理解为行为交互数据),实时事件数据支持流式计算,同时也支持从DB、KV、File中读取批量数据进行批处理。

2、事件驱动、支持数据管道 & ETL

3、支持 Hadoop Yarn、Apache Mesos、K8S部署,水平扩展性强,支持的的数据存储形式HDFS、NFS

4、处理完后的数据同样可以输出到应用、Log、DB或者KV

5、Flink 具备低延迟、高吞吐、基于内存计算等特点

Flink核心特性

1、统一数据处理组件栈,处理不同类型的数据需求,包括流计算、批处理、机器学习相关、图计算

2、时间模式:支持事件时间(Event Time),接入时间(Ingestion Time),处理时间(ProcessingTime)等时间概念

3、基于轻量级分布式快照的实现的容错机制

4、支持有状态计算:状态是 Flink 中的一等公民,Flink提供了多种状态类型:list、map、value

5、支持高度灵活的窗口操作

6、带反压的连续流支持,下游算子处理速度跟不上的时候通过Flow Control将信号传递给上游算子

7、基于JVM实现了独立的内存管理,只存储实际数据的二进制内容,节省空间,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好 ,提升性能的同时, 减少了Full GC和OOM的发生

8、分层次API:Table API / SQL 、 DataStream API ( 流操作的原语支持,Java 和 Scala )、 ProcessFunction

有界流 & 无界流概念

有界流 & 无界流在Flink中属于特别重要的概念,对于离线计算来讲,就是处理的有界流,也叫做批处理。流式计算通常来讲就是处理的无界流,即只有开始,没有结束,从运行的那一刻起就需要不断处理来自数据源的新数据。

构建Flink工程

Maven Archetype 构建

首先通过Maven来建立一个简易Flink工程,通过 Maven Archetype 的构建方式:

1mvn archetype:generate \
2    -DarchetypeGroupId=org.apache.flink \
3    -DarchetypeArtifactId=flink-walkthrough-datastream-java \
4    -DarchetypeVersion=1.14.4 \
5    -DgroupId=frauddetection \
6    -DartifactId=frauddetection \
7    -Dversion=0.1 \
8    -Dpackage=spendreport \
9    -DinteractiveMode=false

这样即可得到了一个示例工程,里面的依赖和示例代码可以参考,通过也有log4j的配置文件。

 1public class FraudDetectionJob {
 2   public static void main(String[] args) throws Exception {
 3      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 4      
 5      // 拿数据
 6      DataStream<Transaction> transactions = env
 7         .addSource(new TransactionSource())
 8         .name("transactions");
 9      // 处理数据
10      DataStream<Alert> alerts = transactions
11         .keyBy(Transaction::getAccountId)
12         .process(new FraudDetector())
13         .name("fraud-detector");
14      // 结果写回
15      alerts
16         .addSink(new AlertSink())
17         .name("send-alerts");
18
19      env.execute("Fraud Detection");
20   }
21}

从上面的示例代码中可以的得到一个Flink编程范式,即拿数据、处理数据、再到结果输出的过程:

1、对接数据源 2、使用引擎进行逻辑处理 3、结果写回某处/输出

MR:InputFormat => Mapper => Reducer => ……

Hive:Table(s) => SQL =>insert…

Spark:RDD/DF/DS => Transaction> Action/Output

Flink:Source => Transaction => Sink / CH

这一点对于大部分大数据处理框架来说都是大同小异的!

Maven 模块化构建

第二个构建工程的方式就是 Maven modules 多模块方式构建,IDEA 直接新建 module即可,Maven子模块想要用父工程的依赖管理,首先在父工程的dependencyManagement 中定义好依赖(注意需要去掉依赖的作用域 scope),子工程再逐个引入(去除version、scope等信息),子工程不用再关心版本问题。

子工程 pom.xml

 1<dependencies>
 2    <dependency>
 3        <groupId>org.apache.flink</groupId>
 4        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 5    </dependency>
 6
 7    <dependency>
 8        <groupId>org.apache.flink</groupId>
 9        <artifactId>flink-clients_${scala.binary.version}</artifactId>
10    </dependency>
11</dependencies>

父工程 pom.xml

 1<dependencyManagement>
 2    <dependencies>
 3        <!-- This dependency is provided, because it should not be packaged into the JAR file. -->
 4        <dependency>
 5            <groupId>org.apache.flink</groupId>
 6            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
 7            <version>${flink.version}</version>
 8        </dependency>
 9        <dependency>
10            <groupId>org.apache.flink</groupId>
11            <artifactId>flink-clients_${scala.binary.version}</artifactId>
12            <version>${flink.version}</version>
13        </dependency>
14    </dependencies>
15</dependencyManagement>

Flink统计词频Demo

流式计算统计词频

对于流式计算得先找到一个数据流,这里简单用 nc 命令代替,nc 式类Unix下的一款非常强大的网络工具库,通过命令行输入如下命令就可以建立Socket服务并且通过终端把字符串发送至客户端:

1root@ubuntu:~# nc -lk 9527
2hello,hello,world,flink,flink,hello
3hello,hello,world,flink,flink,hello

这里9527是服务监听的端口号。

SteamingWCApp.java

 1package cn.tim.flink;
 2
 3import org.apache.commons.lang3.StringUtils;
 4import org.apache.flink.api.common.functions.FilterFunction;
 5import org.apache.flink.api.common.functions.FlatMapFunction;
 6import org.apache.flink.api.common.functions.MapFunction;
 7import org.apache.flink.api.java.functions.KeySelector;
 8import org.apache.flink.api.java.tuple.Tuple2;
 9import org.apache.flink.streaming.api.datastream.DataStreamSource;
10import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
11import org.apache.flink.util.Collector;
12
13/**
14 * 基于 Flink 流式处理 —— 词频统计
15 * 结合 Linux nc命令(功能强大的网络工具)作为 socket 数据流来源
16 * nc -lk 9527
17 * hello,hello,world,flink,flink,hello
18 */
19public class SteamingWCApp {
20    public static void main(String[] args) throws Exception {
21        // 上下文 -> 流处理的上下文
22        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
23
24        // 对接数据源,对接 Socket数据
25        DataStreamSource<String> source = env.socketTextStream("192.168.31.32", 9527);
26
27        // 业务逻辑处理
28        source.flatMap(new FlatMapFunction<String, String>() {
29            @Override
30            public void flatMap(String s, Collector<String> out) throws Exception {
31                String[] words = s.split(",");
32                for(String word: words) {
33                    out.collect(word.toLowerCase().trim());
34                }
35            }
36        }).filter(new FilterFunction<String>() { // 空字符串过滤
37            @Override
38            public boolean filter(String s) throws Exception {
39                return StringUtils.isNotEmpty(s);
40            }
41        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
42            @Override
43            public Tuple2<String, Integer> map(String s) throws Exception {
44                return new Tuple2<>(s, 1);
45            }
46        })
47        // .keyBy(0).sum(1) // 已过时:第一个位置做keyBy -> 0, 对第二个位置做求和操作 -> 1
48        .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
49            @Override
50            public String getKey(Tuple2<String, Integer> value) throws Exception {
51                return value.f0;
52            }
53        })
54        .sum(1)
55        .print();
56        
57        env.execute("SteamingWCApp");
58        /*
59         * 7> (world,1)
60         * 4> (hello,1)
61         * 10> (flink,1)
62         * 10> (flink,2)
63         * 4> (hello,2)
64         * 4> (hello,3)
65         */
66    }
67}

批处理统计词频

Flink同样支持批处理,在统计词频的这个问题中,示例数据写在了 data/word.data这个文本文件里,内容就是一些单词,data/word.data 如下:

1flink,flink,hello,world,flink
2flink,flink,hello,world,flink
3flink,flink,hello,world,flink

BatchWCApp.java

 1package cn.tim.flink;
 2
 3import org.apache.commons.lang3.StringUtils;
 4import org.apache.flink.api.common.functions.FilterFunction;
 5import org.apache.flink.api.common.functions.FlatMapFunction;
 6import org.apache.flink.api.common.functions.MapFunction;
 7import org.apache.flink.api.java.ExecutionEnvironment;
 8import org.apache.flink.api.java.operators.DataSource;
 9import org.apache.flink.api.java.tuple.Tuple2;
10import org.apache.flink.util.Collector;
11
12/**
13 * 离线计算 —— 词频统计
14 */
15public class BatchWCApp {
16    public static void main(String[] args) throws Exception {
17        // 离线计算环境 -> 批处理的上下文
18        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
19
20        // 从文集读文本数据
21        DataSource<String> source = env.readTextFile("data/word.data");
22
23        // 设置过滤器和算子
24        source.flatMap(new FlatMapFunction<String, String>() {
25            @Override
26            public void flatMap(String s, Collector<String> collector) throws Exception {
27                String[] words = s.split(",");
28                for(String word: words) {
29                    collector.collect(word.toLowerCase().trim());
30                }
31            }
32        }).filter(new FilterFunction<String>() {
33            @Override
34            public boolean filter(String s) throws Exception {
35                return StringUtils.isNotEmpty(s);
36            }
37        }).map(new MapFunction<String, Tuple2<String, Integer>>() {
38            @Override
39            public Tuple2<String, Integer> map(String s) throws Exception {
40                return new Tuple2<>(s, 1);
41            }
42        }).groupBy(0)
43          .sum(1)
44          .print();
45
46        /*
47         * (flink,9)
48         * (world,3)
49         * (hello,3)
50         */
51        // 批处理无需exec
52    }
53}

Github 对应代码仓库: https://github.com/zouchanglin/apache-flink/tree/main/modules-flink/flink-basic