编辑
2022-03-30
服务端技术
00
请注意,本文编写于 543 天前,最后修改于 105 天前,其中某些信息可能已经过时。

目录

大数据处理计算模式概览
Apache Flink 概览
Flink核心特性
有界流 & 无界流概念
构建Flink工程
Maven Archetype 构建
Maven 模块化构建
Flink统计词频Demo
流式计算统计词频
批处理统计词频

也是因为最近处理业务上的问题,如果我希望看到某些实时数据的变化那么必然会涉及到流式计算,大数据处理框架从原始的 MapReduce、Hive 到 DAG的Tez,再到Spark的近实时处理是一个不断进化的过程,对于要求更高的场景还是Apache Flink比较在行,Apache Flink是一个带状态的流式计算框架,Flink不但能进行流式计算,也能进行批处理,也就是离线计算。下面是最近两天学习Flink的一些笔记,后面会逐步完善和校正。

大数据处理计算模式概览

流计算与批计算对比:

1、数据时效性不同:流式计算实时、低延迟,批量计算非实时、高延迟。

2、数据特征不同:流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据。

3、应用场景不同:流式计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控等批量计算一般 批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。

4、运行方式不同:流式计算的任务持续进行的,批量计算的任务则一次性完成。

上图来自 Apache Flink 官网:https://flink.apache.org/, 其实最好的学习途径就是先看官方文档,没有人比官方文档更加清晰易懂,看完了之后简单总结一下:

从上图得知:

1、数据来源与计算形式:Flink的数据来源可以是业务数据、日志、IOT采集数据、点击数据(可以理解为行为交互数据),实时事件数据支持流式计算,同时也支持从DB、KV、File中读取批量数据进行批处理。

2、事件驱动、支持数据管道 & ETL

3、支持 Hadoop Yarn、Apache Mesos、K8S部署,水平扩展性强,支持的的数据存储形式HDFS、NFS

4、处理完后的数据同样可以输出到应用、Log、DB或者KV

5、Flink 具备低延迟、高吞吐、基于内存计算等特点

Flink核心特性

1、统一数据处理组件栈,处理不同类型的数据需求,包括流计算、批处理、机器学习相关、图计算

2、时间模式:支持事件时间(Event Time),接入时间(Ingestion Time),处理时间(ProcessingTime)等时间概念

3、基于轻量级分布式快照的实现的容错机制

4、支持有状态计算:状态是 Flink 中的一等公民,Flink提供了多种状态类型:list、map、value

5、支持高度灵活的窗口操作

6、带反压的连续流支持,下游算子处理速度跟不上的时候通过Flow Control将信号传递给上游算子

7、基于JVM实现了独立的内存管理,只存储实际数据的二进制内容,节省空间,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好 ,提升性能的同时, 减少了Full GC和OOM的发生

8、分层次API:Table API / SQL 、 DataStream API ( 流操作的原语支持,Java 和 Scala )、 ProcessFunction

有界流 & 无界流概念

有界流 & 无界流在Flink中属于特别重要的概念,对于离线计算来讲,就是处理的有界流,也叫做批处理。流式计算通常来讲就是处理的无界流,即只有开始,没有结束,从运行的那一刻起就需要不断处理来自数据源的新数据。

构建Flink工程

Maven Archetype 构建

首先通过Maven来建立一个简易Flink工程,通过 Maven Archetype 的构建方式:

shell
mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-walkthrough-datastream-java \ -DarchetypeVersion=1.14.4 \ -DgroupId=frauddetection \ -DartifactId=frauddetection \ -Dversion=0.1 \ -Dpackage=spendreport \ -DinteractiveMode=false

这样即可得到了一个示例工程,里面的依赖和示例代码可以参考,通过也有log4j的配置文件。

java
public class FraudDetectionJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 拿数据 DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions"); // 处理数据 DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector"); // 结果写回 alerts .addSink(new AlertSink()) .name("send-alerts"); env.execute("Fraud Detection"); } }

从上面的示例代码中可以的得到一个Flink编程范式,即拿数据、处理数据、再到结果输出的过程:

1、对接数据源 2、使用引擎进行逻辑处理 3、结果写回某处/输出

MR:InputFormat => Mapper => Reducer => ……

Hive:Table(s) => SQL =>insert...

Spark:RDD/DF/DS => Transaction> Action/Output

Flink:Source => Transaction => Sink / CH

这一点对于大部分大数据处理框架来说都是大同小异的!

Maven 模块化构建

第二个构建工程的方式就是 Maven modules 多模块方式构建,IDEA 直接新建 module即可,Maven子模块想要用父工程的依赖管理,首先在父工程的dependencyManagement 中定义好依赖(注意需要去掉依赖的作用域 scope),子工程再逐个引入(去除version、scope等信息),子工程不用再关心版本问题。

子工程 pom.xml

xml
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> </dependency> </dependencies>

父工程 pom.xml

xml
<dependencyManagement> <dependencies> <!-- This dependency is provided, because it should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> </dependencies> </dependencyManagement>

Flink统计词频Demo

流式计算统计词频

对于流式计算得先找到一个数据流,这里简单用 nc 命令代替,nc 式类Unix下的一款非常强大的网络工具库,通过命令行输入如下命令就可以建立Socket服务并且通过终端把字符串发送至客户端:

shell
root@ubuntu:~# nc -lk 9527 hello,hello,world,flink,flink,hello hello,hello,world,flink,flink,hello

这里9527是服务监听的端口号。

SteamingWCApp.java

java
package cn.tim.flink; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; /** * 基于 Flink 流式处理 —— 词频统计 * 结合 Linux nc命令(功能强大的网络工具)作为 socket 数据流来源 * nc -lk 9527 * hello,hello,world,flink,flink,hello */ public class SteamingWCApp { public static void main(String[] args) throws Exception { // 上下文 -> 流处理的上下文 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 对接数据源,对接 Socket数据 DataStreamSource<String> source = env.socketTextStream("192.168.31.32", 9527); // 业务逻辑处理 source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> out) throws Exception { String[] words = s.split(","); for(String word: words) { out.collect(word.toLowerCase().trim()); } } }).filter(new FilterFunction<String>() { // 空字符串过滤 @Override public boolean filter(String s) throws Exception { return StringUtils.isNotEmpty(s); } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }) // .keyBy(0).sum(1) // 已过时:第一个位置做keyBy -> 0, 对第二个位置做求和操作 -> 1 .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey(Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .sum(1) .print(); env.execute("SteamingWCApp"); /* * 7> (world,1) * 4> (hello,1) * 10> (flink,1) * 10> (flink,2) * 4> (hello,2) * 4> (hello,3) */ } }

批处理统计词频

Flink同样支持批处理,在统计词频的这个问题中,示例数据写在了 data/word.data这个文本文件里,内容就是一些单词,data/word.data 如下:

flink,flink,hello,world,flink flink,flink,hello,world,flink flink,flink,hello,world,flink

BatchWCApp.java

java
package cn.tim.flink; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * 离线计算 —— 词频统计 */ public class BatchWCApp { public static void main(String[] args) throws Exception { // 离线计算环境 -> 批处理的上下文 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 从文集读文本数据 DataSource<String> source = env.readTextFile("data/word.data"); // 设置过滤器和算子 source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String s, Collector<String> collector) throws Exception { String[] words = s.split(","); for(String word: words) { collector.collect(word.toLowerCase().trim()); } } }).filter(new FilterFunction<String>() { @Override public boolean filter(String s) throws Exception { return StringUtils.isNotEmpty(s); } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String s) throws Exception { return new Tuple2<>(s, 1); } }).groupBy(0) .sum(1) .print(); /* * (flink,9) * (world,3) * (hello,3) */ // 批处理无需exec } }

Github 对应代码仓库:https://github.com/zouchanglin/apache-flink/tree/main/modules-flink/flink-basic

本文作者:Tim

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!