也是因为最近处理业务上的问题,如果我希望看到某些实时数据的变化那么必然会涉及到流式计算,大数据处理框架从原始的 MapReduce、Hive 到 DAG的Tez,再到Spark的近实时处理是一个不断进化的过程,对于要求更高的场景还是Apache Flink比较在行,Apache Flink是一个带状态的流式计算框架,Flink不但能进行流式计算,也能进行批处理,也就是离线计算。下面是最近两天学习Flink的一些笔记,后面会逐步完善和校正。
大数据处理计算模式概览
流计算与批计算对比:
1、数据时效性不同:流式计算实时、低延迟,批量计算非实时、高延迟。
2、数据特征不同:流式计算的数据一般是动态的、没有边界的,而批处理的数据一般则是静态数据。
3、应用场景不同:流式计算应用在实时场景,时效性要求比较高的场景,如实时推荐、业务监控等批量计算一般 批处理应用在实时性要求不高、离线计算的场景下,数据分析、离线报表等。
4、运行方式不同:流式计算的任务持续进行的,批量计算的任务则一次性完成。
Apache Flink 概览 上图来自 Apache Flink 官网:https://flink.apache.org/ , 其实最好的学习途径就是先看官方文档,没有人比官方文档更加清晰易懂,看完了之后简单总结一下:
从上图得知:
1、数据来源与计算形式:Flink的数据来源可以是业务数据、日志、IOT采集数据、点击数据(可以理解为行为交互数据),实时事件数据支持流式计算,同时也支持从DB、KV、File中读取批量数据进行批处理。
2、事件驱动、支持数据管道 & ETL
3、支持 Hadoop Yarn、Apache Mesos、K8S部署,水平扩展性强,支持的的数据存储形式HDFS、NFS
4、处理完后的数据同样可以输出到应用、Log、DB或者KV
5、Flink 具备低延迟、高吞吐、基于内存计算等特点
Flink核心特性 1、统一数据处理组件栈,处理不同类型的数据需求,包括流计算、批处理、机器学习相关、图计算
2、时间模式:支持事件时间(Event Time),接入时间(Ingestion Time),处理时间(ProcessingTime)等时间概念
3、基于轻量级分布式快照的实现的容错机制
4、支持有状态计算:状态是 Flink 中的一等公民,Flink提供了多种状态类型:list、map、value
5、支持高度灵活的窗口操作
6、带反压的连续流支持,下游算子处理速度跟不上的时候通过Flow Control将信号传递给上游算子
7、基于JVM实现了独立的内存管理,只存储实际数据的二进制内容,节省空间,该二进制形式可以把相关的值,以及hash值,键值和指针等相邻地放进内存中。这使得数据结构可以对高速缓存更友好 ,提升性能的同时, 减少了Full GC和OOM的发生
8、分层次API:Table API / SQL 、 DataStream API ( 流操作的原语支持,Java 和 Scala )、 ProcessFunction
有界流 & 无界流概念
有界流 & 无界流在Flink中属于特别重要的概念,对于离线计算来讲,就是处理的有界流,也叫做批处理。流式计算通常来讲就是处理的无界流,即只有开始,没有结束,从运行的那一刻起就需要不断处理来自数据源的新数据。
构建Flink工程 Maven Archetype 构建 首先通过Maven来建立一个简易Flink工程,通过 Maven Archetype 的构建方式:
1 2 3 4 5 6 7 8 9 mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-walkthrough-datastream-java \ -DarchetypeVersion=1.14.4 \ -DgroupId=frauddetection \ -DartifactId=frauddetection \ -Dversion=0.1 \ -Dpackage=spendreport \ -DinteractiveMode=false
这样即可得到了一个示例工程,里面的依赖和示例代码可以参考,通过也有log4j的配置文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class FraudDetectionJob { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Transaction> transactions = env .addSource(new TransactionSource()) .name("transactions" ); DataStream<Alert> alerts = transactions .keyBy(Transaction::getAccountId) .process(new FraudDetector()) .name("fraud-detector" ); alerts .addSink(new AlertSink()) .name("send-alerts" ); env.execute("Fraud Detection" ); } }
从上面的示例代码中可以的得到一个Flink编程范式,即拿数据、处理数据、再到结果输出的过程:
1、对接数据源 2、使用引擎进行逻辑处理 3、结果写回某处/输出
MR:InputFormat => Mapper => Reducer => ……
Hive:Table(s) => SQL =>insert…
Spark:RDD/DF/DS => Transaction> Action/Output
Flink:Source => Transaction => Sink / CH
这一点对于大部分大数据处理框架来说都是大同小异的!
Maven 模块化构建 第二个构建工程的方式就是 Maven modules 多模块方式构建,IDEA 直接新建 module即可,Maven子模块想要用父工程的依赖管理,首先在父工程的dependencyManagement 中定义好依赖(注意需要去掉依赖的作用域 scope),子工程再逐个引入(去除version、scope等信息),子工程不用再关心版本问题。
子工程 pom.xml
1 2 3 4 5 6 7 8 9 10 11 <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_${scala.binary.version}</artifactId > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_${scala.binary.version}</artifactId > </dependency > </dependencies >
父工程 pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 <dependencyManagement > <dependencies > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-streaming-java_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > <dependency > <groupId > org.apache.flink</groupId > <artifactId > flink-clients_${scala.binary.version}</artifactId > <version > ${flink.version}</version > </dependency > </dependencies > </dependencyManagement >
Flink统计词频Demo 流式计算统计词频 对于流式计算得先找到一个数据流,这里简单用 nc
命令代替,nc
式类Unix下的一款非常强大的网络工具库,通过命令行输入如下命令就可以建立Socket服务并且通过终端把字符串发送至客户端:
1 2 3 root@ubuntu:~# nc -lk 9527 hello,hello,world,flink,flink,hello hello,hello,world,flink,flink,hello
这里9527是服务监听的端口号。
SteamingWCApp.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package cn.tim.flink;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.util.Collector;public class SteamingWCApp { public static void main (String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> source = env.socketTextStream("192.168.31.32" , 9527 ); source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap (String s, Collector<String> out) throws Exception { String[] words = s.split("," ); for (String word: words) { out.collect(word.toLowerCase().trim()); } } }).filter(new FilterFunction<String>() { @Override public boolean filter (String s) throws Exception { return StringUtils.isNotEmpty(s); } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String s) throws Exception { return new Tuple2<>(s, 1 ); } }) .keyBy(new KeySelector<Tuple2<String, Integer>, String>() { @Override public String getKey (Tuple2<String, Integer> value) throws Exception { return value.f0; } }) .sum(1 ) .print(); env.execute("SteamingWCApp" ); } }
批处理统计词频 Flink同样支持批处理,在统计词频的这个问题中,示例数据写在了 data/word.data这个文本文件里,内容就是一些单词,data/word.data 如下:
1 2 3 flink,flink,hello,world,flink flink,flink,hello,world,flink flink,flink,hello,world,flink
BatchWCApp.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package cn.tim.flink;import org.apache.commons.lang3.StringUtils;import org.apache.flink.api.common.functions.FilterFunction;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.util.Collector;public class BatchWCApp { public static void main (String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); DataSource<String> source = env.readTextFile("data/word.data" ); source.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap (String s, Collector<String> collector) throws Exception { String[] words = s.split("," ); for (String word: words) { collector.collect(word.toLowerCase().trim()); } } }).filter(new FilterFunction<String>() { @Override public boolean filter (String s) throws Exception { return StringUtils.isNotEmpty(s); } }).map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map (String s) throws Exception { return new Tuple2<>(s, 1 ); } }).groupBy(0 ) .sum(1 ) .print(); } }
Github 对应代码仓库:https://github.com/zouchanglin/apache-flink/tree/main/modules-flink/flink-basic